You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2020/01/07 06:31:10 UTC

Issue with kafka connect custom name strategy

Hello everyone,

I'm trying to run kafka connect to dump avro messages to S3, I'm having
issues with the value converter. Our topics have a string key and an avro
value.

I'm running the docker image confluentinc/cp-kafka-connect:5.3.2 with the
following env variables regarding value converter:

CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_VALUE_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: xxx:xxx
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
https://xxx.us-east-2.aws.confluent.cloud


after setting up the S3 sink with this config

{
  "name": "raw-dump",
  "config": {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "partitioner.class":
"io.confluent.connect.storage.partitioner.HourlyPartitioner",
      "locale": "en-US",
      "timezone": "UTC",
      "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
      "tasks.max": "1",
      "topics": "xxx",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "s3.region": "eu-west-1",
      "s3.bucket.name": "confluent-kafka-connect-s3-testing",
      "flush.size": 1000,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}


I was getting this error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in
error handler
Caused by: org.apache.kafka.connect.errors.DataException: Failed to
deserialize data for topic xxx to Avro:
Caused by: org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 100023


log showed this value converter config:

[2020-01-07 06:16:11,670] INFO AvroConverterConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = USER_INFO
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.connect.avro.AvroConverterConfig)
[2020-01-07 06:16:11,694] INFO KafkaAvroSerializerConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = USER_INFO
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig)
[2020-01-07 06:16:11,699] INFO KafkaAvroDeserializerConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = USER_INFO
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    specific.avro.reader = false
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy

So I thought it was because of our custom name strategy. I've added our own
Jar and I've noticed that:

 - if I only add "value.converter.(key|value).subject.name.strategy" it's
not being read by the connector
 - if I add this config to the connector (so on both env variables and
connector config):

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "xxx:xxx",
"value.converter.schema.registry.url": "
https://xxx.us-east-2.aws.confluent.cloud",
"value.converter.key.subject.name.strategy":
"co.myapp.serializers.subject.EnvironmentTopicNameStrategy",
"value.converter.value.subject.name.strategy":
"co.myapp.serializers.subject.EnvironmentTopicNameStrategy"


   it finally tries to use our custom EnvironmentTopicNameStrategy but I
get: "java.lang.RuntimeException:
com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an
instance of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even
if the class extends TopicNameStrategy
 - just adding value.converter (and auth/schema registry url) configs,
results in the same error as above just with the default class:
"java.lang.RuntimeException:
io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"

Any idea? It seems that adding value.converter to the connector breaks even
the default serialization class.

Thanks in advance

--
Alessandro Tagliapietra

Re: Issue with kafka connect custom name strategy

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
I was able to make some progress,
setting CONNECT_VALUE_CONVERTER_KEY_SUBJECT_NAME_STRATEGY
and CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY env variables fixed
the issue.

However this makes it impossible to have a custom name strategy per
connector, just to sum up:
 - setting the value converter as env variable without the custom name
strategy works as long as you don't need a custom name strategy
 - setting the value converter globally and trying to override the name
strategy in a connector config doesn't work, it's just ignored
 - setting the value converter globally and in a connector (even without a
custom name strategy) generates:
    "java.lang.RuntimeException:
io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"

Is the last case expected to have such exception? Or is the configuration
wrong?

--
Alessandro Tagliapietra



On Mon, Jan 6, 2020 at 10:31 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hello everyone,
>
> I'm trying to run kafka connect to dump avro messages to S3, I'm having
> issues with the value converter. Our topics have a string key and an avro
> value.
>
> I'm running the docker image confluentinc/cp-kafka-connect:5.3.2 with the
> following env variables regarding value converter:
>
> CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
> CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.JsonConverter
> CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
> CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: xxx:xxx
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
> https://xxx.us-east-2.aws.confluent.cloud
>
>
> after setting up the S3 sink with this config
>
> {
>   "name": "raw-dump",
>   "config": {
>       "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>       "partitioner.class":
> "io.confluent.connect.storage.partitioner.HourlyPartitioner",
>       "locale": "en-US",
>       "timezone": "UTC",
>       "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
>       "tasks.max": "1",
>       "topics": "xxx",
>       "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>       "s3.region": "eu-west-1",
>       "s3.bucket.name": "confluent-kafka-connect-s3-testing",
>       "flush.size": 1000,
>       "key.converter": "org.apache.kafka.connect.storage.StringConverter"
>   }
> }
>
>
> I was getting this error:
>
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in
> error handler
> Caused by: org.apache.kafka.connect.errors.DataException: Failed to
> deserialize data for topic xxx to Avro:
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> retrieving Avro schema for id 100023
>
>
> log showed this value converter config:
>
> [2020-01-07 06:16:11,670] INFO AvroConverterConfig values:
>     bearer.auth.token = [hidden]
>     schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
>     basic.auth.user.info = [hidden]
>     auto.register.schemas = true
>     max.schemas.per.subject = 1000
>     basic.auth.credentials.source = USER_INFO
>     schema.registry.basic.auth.user.info = [hidden]
>     bearer.auth.credentials.source = STATIC_TOKEN
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>  (io.confluent.connect.avro.AvroConverterConfig)
> [2020-01-07 06:16:11,694] INFO KafkaAvroSerializerConfig values:
>     bearer.auth.token = [hidden]
>     schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
>     basic.auth.user.info = [hidden]
>     auto.register.schemas = true
>     max.schemas.per.subject = 1000
>     basic.auth.credentials.source = USER_INFO
>     schema.registry.basic.auth.user.info = [hidden]
>     bearer.auth.credentials.source = STATIC_TOKEN
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>  (io.confluent.kafka.serializers.KafkaAvroSerializerConfig)
> [2020-01-07 06:16:11,699] INFO KafkaAvroDeserializerConfig values:
>     bearer.auth.token = [hidden]
>     schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
>     basic.auth.user.info = [hidden]
>     auto.register.schemas = true
>     max.schemas.per.subject = 1000
>     basic.auth.credentials.source = USER_INFO
>     schema.registry.basic.auth.user.info = [hidden]
>     bearer.auth.credentials.source = STATIC_TOKEN
>     specific.avro.reader = false
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>
> So I thought it was because of our custom name strategy. I've added our
> own Jar and I've noticed that:
>
>  - if I only add "value.converter.(key|value).subject.name.strategy" it's
> not being read by the connector
>  - if I add this config to the connector (so on both env variables and
> connector config):
>
> "value.converter": "io.confluent.connect.avro.AvroConverter",
> "value.converter.basic.auth.credentials.source": "USER_INFO",
> "value.converter.schema.registry.basic.auth.user.info": "xxx:xxx",
> "value.converter.schema.registry.url": "
> https://xxx.us-east-2.aws.confluent.cloud",
> "value.converter.key.subject.name.strategy":
> "co.myapp.serializers.subject.EnvironmentTopicNameStrategy",
> "value.converter.value.subject.name.strategy":
> "co.myapp.serializers.subject.EnvironmentTopicNameStrategy"
>
>
>    it finally tries to use our custom EnvironmentTopicNameStrategy but I
> get: "java.lang.RuntimeException:
> com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an
> instance of
> io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even
> if the class extends TopicNameStrategy
>  - just adding value.converter (and auth/schema registry url) configs,
> results in the same error as above just with the default class:
> "java.lang.RuntimeException:
> io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
> of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"
>
> Any idea? It seems that adding value.converter to the connector breaks
> even the default serialization class.
>
> Thanks in advance
>
> --
> Alessandro Tagliapietra
>
>