You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bauddhik Anand <bd...@gmail.com> on 2023/05/15 04:04:52 UTC

How to pass the TLS certs to the latest version of flink-connector-pulsar

I am trying to connect my Flink application to a Pulsar topic for ingesting
data. The topic is active and i am able to ingest the data via a normal
Java application.

When i try to use the Flink application to ingest the data from the same
topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
do not find in the documenation anywhere how to pass to pass the TLS certs.

I tried with below code:


final StreamExecutionEnvironment envn =
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();

            config.setString("pulsar.client.authentication","tls");
            config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
            config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
            config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);

 PulsarSource<String> pulsarSource = PulsarSource.builder()
                    .setServiceUrl("serviceurl")
                    .setAdminUrl("adminurl")
                    .setStartCursor(StartCursor.earliest())
                    .setTopics("topicname")
                    .setDeserializationSchema(new SimpleStringSchema())
                    .setSubscriptionName("test-sub")
                    .setConfig(config)
                    .build();


pulsarStream.map(new MapFunction<String, String>() {
                private static final long serialVersionUID =
-999736771747691234L;

                public String map(String value) throws Exception {
                  return "Receiving from Pulsar : " + value;
                }
              }).print();


            envn.execute();


As per documentation i did not find any inbuilt method in the PulsarSource
class to pass the TLS certs, i tried using the PulsarClient options as
config and pass it to PulsarSource as option.

This doesn't seem to work, as when i try to deploy the app, the Flink job
is submitted and JobManager throws the below error.

Caused by: sun.security.validator.ValidatorException: PKIX path
building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
    at sun.security.validator.PKIXValidator.engineValidate(Unknown
Source) ~[?:?]
    at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]


Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
    at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
Source) ~[?:?]
    at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
Source) ~[?:?]
    at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
    at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]

I have already verified the certs path and it is correct, also i am using
the same path as a volume mount for my other apps and they work fine.

My question is :

How i can pass the certs to the latest version of the
*flink-connector-pulsar* i.e *4.0.0-1.17*

Re: How to pass the TLS certs to the latest version of flink-connector-pulsar

Posted by Bauddhik Anand <bd...@gmail.com>.
Thanks for your response.

Yes service url, admin url, topic etc are correct and have correct prefix
as well.

It is working with normal java application.

I am not sure how i can pass the TLS certs.




On Tue, 16 May, 2023, 14:52 Weihua Hu, <hu...@gmail.com> wrote:

> Hi,
>
> Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl
>
> Best,
> Weihua
>
>
> On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bd...@gmail.com> wrote:
>
> > Can someone please help with this?
> >
> > On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bd...@gmail.com> wrote:
> >
> > > I am trying to connect my Flink application to a Pulsar topic for
> > > ingesting data. The topic is active and i am able to ingest the data
> via
> > a
> > > normal Java application.
> > >
> > > When i try to use the Flink application to ingest the data from the
> same
> > > topic, using the latest version of flink-connector-pulsar i.e
> > 4.0.0-1.17, i
> > > do not find in the documenation anywhere how to pass to pass the TLS
> > certs.
> > >
> > > I tried with below code:
> > >
> > >
> > > final StreamExecutionEnvironment envn =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > >
> > > Configuration config = new Configuration();
> > >
> > >             config.setString("pulsar.client.authentication","tls");
> > >
> >  config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> > >             config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> > >
> >  config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
> > >
> > >  PulsarSource<String> pulsarSource = PulsarSource.builder()
> > >                     .setServiceUrl("serviceurl")
> > >                     .setAdminUrl("adminurl")
> > >                     .setStartCursor(StartCursor.earliest())
> > >                     .setTopics("topicname")
> > >                     .setDeserializationSchema(new SimpleStringSchema())
> > >                     .setSubscriptionName("test-sub")
> > >                     .setConfig(config)
> > >                     .build();
> > >
> > >
> > > pulsarStream.map(new MapFunction<String, String>() {
> > >                 private static final long serialVersionUID =
> > -999736771747691234L;
> > >
> > >                 public String map(String value) throws Exception {
> > >                   return "Receiving from Pulsar : " + value;
> > >                 }
> > >               }).print();
> > >
> > >
> > >             envn.execute();
> > >
> > >
> > > As per documentation i did not find any inbuilt method in the
> > PulsarSource
> > > class to pass the TLS certs, i tried using the PulsarClient options as
> > > config and pass it to PulsarSource as option.
> > >
> > > This doesn't seem to work, as when i try to deploy the app, the Flink
> job
> > > is submitted and JobManager throws the below error.
> > >
> > > Caused by: sun.security.validator.ValidatorException: PKIX path
> building
> > failed: sun.security.provider.certpath.SunCertPathBuilderException:
> unable
> > to find valid certification path to requested target
> > >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> > ~[?:?]
> > >     at sun.security.validator.PKIXValidator.engineValidate(Unknown
> > Source) ~[?:?]
> > >     at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> > >     at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source)
> > ~[?:?]
> > >
> > >
> > > Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> > unable to find valid certification path to requested target
> > >     at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> > Source) ~[?:?]
> > >     at
> > sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> > Source) ~[?:?]
> > >     at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> > >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> > ~[?:?]
> > >
> > > I have already verified the certs path and it is correct, also i am
> using
> > > the same path as a volume mount for my other apps and they work fine.
> > >
> > > My question is :
> > >
> > > How i can pass the certs to the latest version of the
> > > *flink-connector-pulsar* i.e *4.0.0-1.17*
> > >
> >
>

Re: How to pass the TLS certs to the latest version of flink-connector-pulsar

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl

Best,
Weihua


On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bd...@gmail.com> wrote:

> Can someone please help with this?
>
> On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bd...@gmail.com> wrote:
>
> > I am trying to connect my Flink application to a Pulsar topic for
> > ingesting data. The topic is active and i am able to ingest the data via
> a
> > normal Java application.
> >
> > When i try to use the Flink application to ingest the data from the same
> > topic, using the latest version of flink-connector-pulsar i.e
> 4.0.0-1.17, i
> > do not find in the documenation anywhere how to pass to pass the TLS
> certs.
> >
> > I tried with below code:
> >
> >
> > final StreamExecutionEnvironment envn =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > Configuration config = new Configuration();
> >
> >             config.setString("pulsar.client.authentication","tls");
> >
>  config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> >             config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> >
>  config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
> >
> >  PulsarSource<String> pulsarSource = PulsarSource.builder()
> >                     .setServiceUrl("serviceurl")
> >                     .setAdminUrl("adminurl")
> >                     .setStartCursor(StartCursor.earliest())
> >                     .setTopics("topicname")
> >                     .setDeserializationSchema(new SimpleStringSchema())
> >                     .setSubscriptionName("test-sub")
> >                     .setConfig(config)
> >                     .build();
> >
> >
> > pulsarStream.map(new MapFunction<String, String>() {
> >                 private static final long serialVersionUID =
> -999736771747691234L;
> >
> >                 public String map(String value) throws Exception {
> >                   return "Receiving from Pulsar : " + value;
> >                 }
> >               }).print();
> >
> >
> >             envn.execute();
> >
> >
> > As per documentation i did not find any inbuilt method in the
> PulsarSource
> > class to pass the TLS certs, i tried using the PulsarClient options as
> > config and pass it to PulsarSource as option.
> >
> > This doesn't seem to work, as when i try to deploy the app, the Flink job
> > is submitted and JobManager throws the below error.
> >
> > Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> ~[?:?]
> >     at sun.security.validator.PKIXValidator.engineValidate(Unknown
> Source) ~[?:?]
> >     at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> >     at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source)
> ~[?:?]
> >
> >
> > Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
> >     at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> Source) ~[?:?]
> >     at
> sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> Source) ~[?:?]
> >     at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> ~[?:?]
> >
> > I have already verified the certs path and it is correct, also i am using
> > the same path as a volume mount for my other apps and they work fine.
> >
> > My question is :
> >
> > How i can pass the certs to the latest version of the
> > *flink-connector-pulsar* i.e *4.0.0-1.17*
> >
>

Re: How to pass the TLS certs to the latest version of flink-connector-pulsar

Posted by Bauddhik Anand <bd...@gmail.com>.
Can someone please help with this?

On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bd...@gmail.com> wrote:

> I am trying to connect my Flink application to a Pulsar topic for
> ingesting data. The topic is active and i am able to ingest the data via a
> normal Java application.
>
> When i try to use the Flink application to ingest the data from the same
> topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
> do not find in the documenation anywhere how to pass to pass the TLS certs.
>
> I tried with below code:
>
>
> final StreamExecutionEnvironment envn = StreamExecutionEnvironment.getExecutionEnvironment();
>
> Configuration config = new Configuration();
>
>             config.setString("pulsar.client.authentication","tls");
>             config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
>             config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
>             config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
>
>  PulsarSource<String> pulsarSource = PulsarSource.builder()
>                     .setServiceUrl("serviceurl")
>                     .setAdminUrl("adminurl")
>                     .setStartCursor(StartCursor.earliest())
>                     .setTopics("topicname")
>                     .setDeserializationSchema(new SimpleStringSchema())
>                     .setSubscriptionName("test-sub")
>                     .setConfig(config)
>                     .build();
>
>
> pulsarStream.map(new MapFunction<String, String>() {
>                 private static final long serialVersionUID = -999736771747691234L;
>
>                 public String map(String value) throws Exception {
>                   return "Receiving from Pulsar : " + value;
>                 }
>               }).print();
>
>
>             envn.execute();
>
>
> As per documentation i did not find any inbuilt method in the PulsarSource
> class to pass the TLS certs, i tried using the PulsarClient options as
> config and pass it to PulsarSource as option.
>
> This doesn't seem to work, as when i try to deploy the app, the Flink job
> is submitted and JobManager throws the below error.
>
> Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
>     at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
>     at sun.security.validator.PKIXValidator.engineValidate(Unknown Source) ~[?:?]
>     at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
>     at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]
>
>
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
>     at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown Source) ~[?:?]
>     at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown Source) ~[?:?]
>     at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
>     at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
>
> I have already verified the certs path and it is correct, also i am using
> the same path as a volume mount for my other apps and they work fine.
>
> My question is :
>
> How i can pass the certs to the latest version of the
> *flink-connector-pulsar* i.e *4.0.0-1.17*
>