You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bauddhik Anand <bd...@gmail.com> on 2023/05/15 04:04:52 UTC
How to pass the TLS certs to the latest version of flink-connector-pulsar
I am trying to connect my Flink application to a Pulsar topic for ingesting
data. The topic is active and i am able to ingest the data via a normal
Java application.
When i try to use the Flink application to ingest the data from the same
topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
do not find in the documenation anywhere how to pass to pass the TLS certs.
I tried with below code:
final StreamExecutionEnvironment envn =
StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.setString("pulsar.client.authentication","tls");
config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
PulsarSource<String> pulsarSource = PulsarSource.builder()
.setServiceUrl("serviceurl")
.setAdminUrl("adminurl")
.setStartCursor(StartCursor.earliest())
.setTopics("topicname")
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName("test-sub")
.setConfig(config)
.build();
pulsarStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID =
-999736771747691234L;
public String map(String value) throws Exception {
return "Receiving from Pulsar : " + value;
}
}).print();
envn.execute();
As per documentation i did not find any inbuilt method in the PulsarSource
class to pass the TLS certs, i tried using the PulsarClient options as
config and pass it to PulsarSource as option.
This doesn't seem to work, as when i try to deploy the app, the Flink job
is submitted and JobManager throws the below error.
Caused by: sun.security.validator.ValidatorException: PKIX path
building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
at sun.security.validator.PKIXValidator.engineValidate(Unknown
Source) ~[?:?]
at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]
Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
Source) ~[?:?]
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
Source) ~[?:?]
at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
I have already verified the certs path and it is correct, also i am using
the same path as a volume mount for my other apps and they work fine.
My question is :
How i can pass the certs to the latest version of the
*flink-connector-pulsar* i.e *4.0.0-1.17*
Re: How to pass the TLS certs to the latest version of flink-connector-pulsar
Posted by Bauddhik Anand <bd...@gmail.com>.
Thanks for your response.
Yes service url, admin url, topic etc are correct and have correct prefix
as well.
It is working with normal java application.
I am not sure how i can pass the TLS certs.
On Tue, 16 May, 2023, 14:52 Weihua Hu, <hu...@gmail.com> wrote:
> Hi,
>
> Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl
>
> Best,
> Weihua
>
>
> On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bd...@gmail.com> wrote:
>
> > Can someone please help with this?
> >
> > On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bd...@gmail.com> wrote:
> >
> > > I am trying to connect my Flink application to a Pulsar topic for
> > > ingesting data. The topic is active and i am able to ingest the data
> via
> > a
> > > normal Java application.
> > >
> > > When i try to use the Flink application to ingest the data from the
> same
> > > topic, using the latest version of flink-connector-pulsar i.e
> > 4.0.0-1.17, i
> > > do not find in the documenation anywhere how to pass to pass the TLS
> > certs.
> > >
> > > I tried with below code:
> > >
> > >
> > > final StreamExecutionEnvironment envn =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > >
> > > Configuration config = new Configuration();
> > >
> > > config.setString("pulsar.client.authentication","tls");
> > >
> > config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> > > config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> > >
> > config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
> > >
> > > PulsarSource<String> pulsarSource = PulsarSource.builder()
> > > .setServiceUrl("serviceurl")
> > > .setAdminUrl("adminurl")
> > > .setStartCursor(StartCursor.earliest())
> > > .setTopics("topicname")
> > > .setDeserializationSchema(new SimpleStringSchema())
> > > .setSubscriptionName("test-sub")
> > > .setConfig(config)
> > > .build();
> > >
> > >
> > > pulsarStream.map(new MapFunction<String, String>() {
> > > private static final long serialVersionUID =
> > -999736771747691234L;
> > >
> > > public String map(String value) throws Exception {
> > > return "Receiving from Pulsar : " + value;
> > > }
> > > }).print();
> > >
> > >
> > > envn.execute();
> > >
> > >
> > > As per documentation i did not find any inbuilt method in the
> > PulsarSource
> > > class to pass the TLS certs, i tried using the PulsarClient options as
> > > config and pass it to PulsarSource as option.
> > >
> > > This doesn't seem to work, as when i try to deploy the app, the Flink
> job
> > > is submitted and JobManager throws the below error.
> > >
> > > Caused by: sun.security.validator.ValidatorException: PKIX path
> building
> > failed: sun.security.provider.certpath.SunCertPathBuilderException:
> unable
> > to find valid certification path to requested target
> > > at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> > ~[?:?]
> > > at sun.security.validator.PKIXValidator.engineValidate(Unknown
> > Source) ~[?:?]
> > > at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> > > at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source)
> > ~[?:?]
> > >
> > >
> > > Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> > unable to find valid certification path to requested target
> > > at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> > Source) ~[?:?]
> > > at
> > sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> > Source) ~[?:?]
> > > at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> > > at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> > ~[?:?]
> > >
> > > I have already verified the certs path and it is correct, also i am
> using
> > > the same path as a volume mount for my other apps and they work fine.
> > >
> > > My question is :
> > >
> > > How i can pass the certs to the latest version of the
> > > *flink-connector-pulsar* i.e *4.0.0-1.17*
> > >
> >
>
Re: How to pass the TLS certs to the latest version of flink-connector-pulsar
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl
Best,
Weihua
On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bd...@gmail.com> wrote:
> Can someone please help with this?
>
> On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bd...@gmail.com> wrote:
>
> > I am trying to connect my Flink application to a Pulsar topic for
> > ingesting data. The topic is active and i am able to ingest the data via
> a
> > normal Java application.
> >
> > When i try to use the Flink application to ingest the data from the same
> > topic, using the latest version of flink-connector-pulsar i.e
> 4.0.0-1.17, i
> > do not find in the documenation anywhere how to pass to pass the TLS
> certs.
> >
> > I tried with below code:
> >
> >
> > final StreamExecutionEnvironment envn =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > Configuration config = new Configuration();
> >
> > config.setString("pulsar.client.authentication","tls");
> >
> config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> > config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> >
> config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
> >
> > PulsarSource<String> pulsarSource = PulsarSource.builder()
> > .setServiceUrl("serviceurl")
> > .setAdminUrl("adminurl")
> > .setStartCursor(StartCursor.earliest())
> > .setTopics("topicname")
> > .setDeserializationSchema(new SimpleStringSchema())
> > .setSubscriptionName("test-sub")
> > .setConfig(config)
> > .build();
> >
> >
> > pulsarStream.map(new MapFunction<String, String>() {
> > private static final long serialVersionUID =
> -999736771747691234L;
> >
> > public String map(String value) throws Exception {
> > return "Receiving from Pulsar : " + value;
> > }
> > }).print();
> >
> >
> > envn.execute();
> >
> >
> > As per documentation i did not find any inbuilt method in the
> PulsarSource
> > class to pass the TLS certs, i tried using the PulsarClient options as
> > config and pass it to PulsarSource as option.
> >
> > This doesn't seem to work, as when i try to deploy the app, the Flink job
> > is submitted and JobManager throws the below error.
> >
> > Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> > at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> ~[?:?]
> > at sun.security.validator.PKIXValidator.engineValidate(Unknown
> Source) ~[?:?]
> > at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> > at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source)
> ~[?:?]
> >
> >
> > Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
> > at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> Source) ~[?:?]
> > at
> sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> Source) ~[?:?]
> > at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> > at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> ~[?:?]
> >
> > I have already verified the certs path and it is correct, also i am using
> > the same path as a volume mount for my other apps and they work fine.
> >
> > My question is :
> >
> > How i can pass the certs to the latest version of the
> > *flink-connector-pulsar* i.e *4.0.0-1.17*
> >
>
Re: How to pass the TLS certs to the latest version of flink-connector-pulsar
Posted by Bauddhik Anand <bd...@gmail.com>.
Can someone please help with this?
On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bd...@gmail.com> wrote:
> I am trying to connect my Flink application to a Pulsar topic for
> ingesting data. The topic is active and i am able to ingest the data via a
> normal Java application.
>
> When i try to use the Flink application to ingest the data from the same
> topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
> do not find in the documenation anywhere how to pass to pass the TLS certs.
>
> I tried with below code:
>
>
> final StreamExecutionEnvironment envn = StreamExecutionEnvironment.getExecutionEnvironment();
>
> Configuration config = new Configuration();
>
> config.setString("pulsar.client.authentication","tls");
> config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
>
> PulsarSource<String> pulsarSource = PulsarSource.builder()
> .setServiceUrl("serviceurl")
> .setAdminUrl("adminurl")
> .setStartCursor(StartCursor.earliest())
> .setTopics("topicname")
> .setDeserializationSchema(new SimpleStringSchema())
> .setSubscriptionName("test-sub")
> .setConfig(config)
> .build();
>
>
> pulsarStream.map(new MapFunction<String, String>() {
> private static final long serialVersionUID = -999736771747691234L;
>
> public String map(String value) throws Exception {
> return "Receiving from Pulsar : " + value;
> }
> }).print();
>
>
> envn.execute();
>
>
> As per documentation i did not find any inbuilt method in the PulsarSource
> class to pass the TLS certs, i tried using the PulsarClient options as
> config and pass it to PulsarSource as option.
>
> This doesn't seem to work, as when i try to deploy the app, the Flink job
> is submitted and JobManager throws the below error.
>
> Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
> at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
> at sun.security.validator.PKIXValidator.engineValidate(Unknown Source) ~[?:?]
> at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]
>
>
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
> at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown Source) ~[?:?]
> at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown Source) ~[?:?]
> at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
>
> I have already verified the certs path and it is correct, also i am using
> the same path as a volume mount for my other apps and they work fine.
>
> My question is :
>
> How i can pass the certs to the latest version of the
> *flink-connector-pulsar* i.e *4.0.0-1.17*
>