You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Patrick Eifler <pa...@gmail.com> on 2020/11/03 13:30:58 UTC

Run Flink Job with Confluent Schema Registry over SSL

Hello,

I'm running a Flink Session Cluster on K8s and deploy the Flink jobs using the the Flink rest API. The jobs using Avro for the producers and consumers. The jobs consume and produce from/to a secured Kafka cluster via TLS and SCRAM-SHA. Everything works as expected.

Now I need to introduce the Schema Registry, to accomplish this I use the library from Flink: flink-avro-confluent-registry, version 1.11.1. 
Today I found out that this library is using an old version of the kafka-schema-registry-client, when I checked the META-INF on the package dependency for the Kafka Schema Registry Client of the library.

#Created by Apache Maven 3.3.9
version=4.1.0
groupId=io.confluent
artifactId=kafka-schema-registry-client
 
I think this is the main problem, because the schema registry that is deployed on my cluster is using 5.5 and there have been considerable improvements, especially around SSL support since version 5.4 See the following PR: https://github.com/confluentinc/schema-registry/pull/957/files <https://github.com/confluentinc/schema-registry/pull/957/files> Which was merged into version 5.4. As you also can see here: https://docs.confluent.io/current/schema-registry/security/index.html#additional-configurations-for-https <https://docs.confluent.io/current/schema-registry/security/index.html#additional-configurations-for-https>

So far none of the documented solutions worked. And it is also not possible to implement my own Serializer with this Flink library because all important classes are either have private or protected constructors.

So to my question: 
Will this library flink-avro-confluent-registry be updated to use the latest Kafka Schema Registry Client (v5.5) to support SSL and will it be possible to pass in the config map with the schema registry properties into the serializer as described in the documentation?

So far I tried all documented options without success and added all the properties as described:

val props: Map[String, String] = Map[String, String](
      "schema.registry.ssl.keystore.location" ->"/config/keystore/keystore.jks",
      "schema.registry.ssl.keystore.location" -> "/config/keystore/keystore.jks",
      "schema.registry.ssl.keystore.password" -> kafkaSettings.keystorePass.get,
      "schema.registry.ssl.truststore.location" -> "/config/keystore/keystore.jks",
      "schema.registry.ssl.truststore.password" -> kafkaSettings.keystorePass.get,
      "schema.registry.ssl.key.password" -> kafkaSettings.keystorePass.get
    )

I also tried the legacy approach to put the keystore and truststore as environment variables directly on the jvm. Nothing works so far.

So if someone found a way to implement SSL with Flink 1.11.1 against a SSL secured confluent Schema Registry, please reach out or advice on the development of the library

Many Thanks for your time.

Cheers,
 
Patrick

Re: Run Flink Job with Confluent Schema Registry over SSL

Posted by Robert Metzger <rm...@apache.org>.
Hi Patrick,

The upcoming Flink 1.12 release will update the version to 5.4.2 at least:
https://github.com/apache/flink/pull/12919/files
This is closer to what you need, but still not there :(

What you can try is compile your own version of
flink-avro-confluent-registry, where you pass -Dconfluent.version=5.5.0 to
mvn.

There is already a ticket for bumping the version:
https://issues.apache.org/jira/browse/FLINK-18546
Maybe we'll manage to merge this for the 1.12 release.

Best,
Robert



On Tue, Nov 3, 2020 at 2:31 PM Patrick Eifler <pa...@gmail.com>
wrote:

> Hello,
>
> I'm running a Flink Session Cluster on K8s and deploy the Flink jobs using
> the the Flink rest API. The jobs using Avro for the producers and
> consumers. The jobs consume and produce from/to a secured Kafka cluster via
> TLS and SCRAM-SHA. Everything works as expected.
>
> Now I need to introduce the Schema Registry, to accomplish this I use the
> library from Flink: flink-avro-confluent-registry, version 1.11.1.
> Today I found out that this library is using an old version of the
> kafka-schema-registry-client, when I checked the META-INF on the package
> dependency for the Kafka Schema Registry Client of the library.
>
> #Created by Apache Maven 3.3.9
> version=4.1.0
> groupId=io.confluent
> artifactId=kafka-schema-registry-client
>
>
> I think this is the main problem, because the schema registry that is
> deployed on my cluster is using 5.5 and there have been considerable
> improvements, especially around SSL support since version 5.4 See the
> following PR:
> https://github.com/confluentinc/schema-registry/pull/957/files Which was
> merged into version 5.4. As you also can see here:
> https://docs.confluent.io/current/schema-registry/security/index.html#additional-configurations-for-https
>
> So far none of the documented solutions worked. And it is also not
> possible to implement my own Serializer with this Flink library because all
> important classes are either have private or protected constructors.
>
> So to my question:
> Will this library flink-avro-confluent-registry be updated to use the
> latest Kafka Schema Registry Client (v5.5) to support SSL and will it be
> possible to pass in the config map with the schema registry properties into
> the serializer as described in the documentation?
>
> So far I tried all documented options without success and added all the
> properties as described:
>
> val props: Map[String, String] = Map[String, String](
>       "schema.registry.ssl.keystore.location"
> ->"/config/keystore/keystore.jks",
>       "schema.registry.ssl.keystore.location" ->
> "/config/keystore/keystore.jks",
>       "schema.registry.ssl.keystore.password" ->
> kafkaSettings.keystorePass.get,
>       "schema.registry.ssl.truststore.location" ->
> "/config/keystore/keystore.jks",
>       "schema.registry.ssl.truststore.password" ->
> kafkaSettings.keystorePass.get,
>       "schema.registry.ssl.key.password" -> kafkaSettings.keystorePass.get
>     )
>
> I also tried the legacy approach to put the keystore and truststore as
> environment variables directly on the jvm. Nothing works so far.
>
> So if someone found a way to implement SSL with Flink 1.11.1 against a SSL
> secured confluent Schema Registry, please reach out or advice on the
> development of the library
>
> Many Thanks for your time.
>
> Cheers,
>
>
> Patrick
>