You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Serb <da...@boatyardx.com> on 2022/04/05 08:54:11 UTC

FlinkKafkaProducer - Avro - Schema Registry

Hi guys,

I’m working on a solution where I ingest Kafka Records and I need to sink them to another topic using Avro and Schema Registry.
The problem I’m facing, is that I can’t find a suitable configuration that actually works for me.

I’m going to explain.


  1.  I have a KafkaSource that consumes basically the initial stream of data.
  2.  I have an Operator that maps the kafka records to Avro Objects (Java POJOs generated using mvn avro plugin, based on .avsc files)
  3.  I register the schemas in Schema Registry using the mvn schema-registry:register plugin/goal (registering the schema type as AVRO.
  4.  I have a FlinkKafkaProducer<GeneratedAvroObject> where I provide a serialization schema of type ConfluentRegistrySerializationSchema.

My Kafka Properties for the Producer:

kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
kafkaProps.put(
    KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schemaregistry:38081");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("auto.register.schemas", false);
kafkaProps.put("use.latest.version", true);

As I learned from other tutorials/articles, I need to basically use KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
This will bring me eventually in the place from KafkaAvroSerializer, where based on how the record actually looks, it will get me the schema, it will go to the schema registry and bring the schema for the needed record, and serialize it before it gets sent.
The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() method, the keyedSchema is null in my case, but kafkaSchema is not null, and it basically does a ‘pre-serialization’ that is transforming my Record into a byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the Record is already a byte[] and it basically returns back a schema of type “bytes” instead of returning the schema I have for that SpecificRecord. And when it brings the propper schema from the schema registry, it basically fails for not being compatible. Schema {} is not compatible with schema of type “bytes”.

For more context, this is how my Processor looks at this moment.


DataStream<ObjectNode> kafkaRecords =
    env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");

SingleOutputStreamOperator<AvroObject> producedRecords =
    kafkaRecords
        .map(
            value -> {
              String kafkaKey = value.get(KEY).asText();
              String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
              return Converter.convert(kafkaKey, kafkaRecordJson);
            })
        .returns(TypeInformation.of(AvroObject.class));

AvroSerializationSchema<AvroObject > schema =
        ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);

FlinkKafkaProducer< AvroObject > kafkaProducer =
        new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);

producedRecords.addSink(kafkaProducer);

env.execute();

Exception:
Caused by: java.io.IOException: Incompatible schema { avro schema here} }with refs [] of type AVRO for schema "bytes".

PS: If I remove the KafkaAvroSerializer from the producer properties, it works fine, but when I consume the messages, the first message gets consumed but the values from the record are default ones. And the second message throws exception EOFExcetion – could not debug yet exactly the cause. It seems like, when I don’t have the KafkaAvroSerializer, is not actually going to use the schema registry to get the schema back and use that as a serializer, so I definitely need to have that there, but I still think I need to do some more config changes maybe in other places, because it’s definitely not working as expected.

Thanks a lot!
I would appreciate at least some points where I could investigate more and if there is someone else that has a similar implementation, maybe some tips and tricks.

Regards,
Dan Serb



Re: FlinkKafkaProducer - Avro - Schema Registry

Posted by Dan Serb <da...@boatyardx.com>.
Hello Qingsheng,

Removing KafkaAvroSerializer from the producer properties worked, indeed.
I validated this by using a FlinkKafkaConsumer, using ConfluentRegistryAvroDeserializationSchema, so it's working properly.

The problem I'm still having, is that I will have to use schema registry where I will register multiple types of schemas, for messages that are going to come on the same kafka topic.
That means, that I will need the implementation that KafkaAvroSerializer.class is providing, and that is - going to schema registry to get the schema back by subject.

By only using ConfluentRegistryAvroDeserializationSceham.forSpecific() in my FlinkKafkaProducer, I don't see how I can have access to that functionality, as I debugged internally, and it seems like it's not going through the path I would like it to go.

So, in conclusion, I think I somehow need to have the producer properties together with the KafkaAvroSerializer still, for me to force the serialization to go through Schema Registry.

Regards,
Dan Serb

On 08.04.2022, 05:23, "Qingsheng Ren" <re...@gmail.com> wrote:

    Hi Dan,

    In FlinkKafkaProducer, records are serialized by the SerializationSchema specified in the constructor, which is the “schema” (ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in your case, instead of the serializer specified in producer properties. The default serializer used by FlinkKafkaProducer is ByteArraySerializer, so the flow of serialization would be:

    [AvroObject] -> SerializationSchema -> [Bytes] -> ByteArraySerializer -> [Bytes]

    So I think removing KafkaAvroSerializer from producer config and use AvroSerializationSchema is the right way. As you mentioned that messages could not be consumed back successfully, could you provide more information about how you consume message from Kafka (like using KafkaSource by Flink or just a KafkaConsumer, maybe also the configuration you are using)?

    Best regards,

    Qingsheng


    > On Apr 5, 2022, at 16:54, Dan Serb <da...@boatyardx.com> wrote:
    > 
    > Hi guys,
    >  
    > I’m working on a solution where I ingest Kafka Records and I need to sink them to another topic using Avro and Schema Registry.
    > The problem I’m facing, is that I can’t find a suitable configuration that actually works for me.
    >  
    > I’m going to explain.
    >  
    > 	• I have a KafkaSource that consumes basically the initial stream of data.
    > 	• I have an Operator that maps the kafka records to Avro Objects (Java POJOs generated using mvn avro plugin, based on .avsc files)
    > 	• I register the schemas in Schema Registry using the mvn schema-registry:register plugin/goal (registering the schema type as AVRO.
    > 	• I have a FlinkKafkaProducer<GeneratedAvroObject> where I provide a serialization schema of type ConfluentRegistrySerializationSchema.
    >  
    > My Kafka Properties for the Producer:
    >  
    > kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    > kafkaProps.put(
    >     KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schemaregistry:38081");
    > kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    > kafkaProps.put("auto.register.schemas", false);
    > kafkaProps.put("use.latest.version", true);
    >  
    > As I learned from other tutorials/articles, I need to basically use KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
    > This will bring me eventually in the place from KafkaAvroSerializer, where based on how the record actually looks, it will get me the schema, it will go to the schema registry and bring the schema for the needed record, and serialize it before it gets sent.
    > The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() method, the keyedSchema is null in my case, but kafkaSchema is not null, and it basically does a ‘pre-serialization’ that is transforming my Record into a byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the Record is already a byte[] and it basically returns back a schema of type “bytes” instead of returning the schema I have for that SpecificRecord. And when it brings the propper schema from the schema registry, it basically fails for not being compatible. Schema {} is not compatible with schema of type “bytes”.
    >  
    > For more context, this is how my Processor looks at this moment.
    >  
    > DataStream<ObjectNode> kafkaRecords =
    >     env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");
    > 
    > SingleOutputStreamOperator<AvroObject> producedRecords =
    >     kafkaRecords
    >         .map(
    >             value -> {
    >               String kafkaKey = value.get(KEY).asText();
    >               String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
    >               return Converter.convert(kafkaKey, kafkaRecordJson);
    >             })
    >         .returns(TypeInformation.of(AvroObject.class));
    > 
    > AvroSerializationSchema<AvroObject > schema =
    >         ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);
    > 
    > FlinkKafkaProducer< AvroObject > kafkaProducer =
    >         new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);
    > 
    > producedRecords.addSink(kafkaProducer);
    > 
    > env.execute();
    >  
    > Exception:
    > Caused by: java.io.IOException: Incompatible schema { avro schema here} }with refs [] of type AVRO for schema "bytes".
    >  
    > PS: If I remove the KafkaAvroSerializer from the producer properties, it works fine, but when I consume the messages, the first message gets consumed but the values from the record are default ones. And the second message throws exception EOFExcetion – could not debug yet exactly the cause. It seems like, when I don’t have the KafkaAvroSerializer, is not actually going to use the schema registry to get the schema back and use that as a serializer, so I definitely need to have that there, but I still think I need to do some more config changes maybe in other places, because it’s definitely not working as expected.
    >  
    > Thanks a lot!
    > I would appreciate at least some points where I could investigate more and if there is someone else that has a similar implementation, maybe some tips and tricks.
    >  
    > Regards,
    > Dan Serb



Re: FlinkKafkaProducer - Avro - Schema Registry

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Dan,

In FlinkKafkaProducer, records are serialized by the SerializationSchema specified in the constructor, which is the “schema” (ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in your case, instead of the serializer specified in producer properties. The default serializer used by FlinkKafkaProducer is ByteArraySerializer, so the flow of serialization would be:

[AvroObject] -> SerializationSchema -> [Bytes] -> ByteArraySerializer -> [Bytes]

So I think removing KafkaAvroSerializer from producer config and use AvroSerializationSchema is the right way. As you mentioned that messages could not be consumed back successfully, could you provide more information about how you consume message from Kafka (like using KafkaSource by Flink or just a KafkaConsumer, maybe also the configuration you are using)?

Best regards,

Qingsheng


> On Apr 5, 2022, at 16:54, Dan Serb <da...@boatyardx.com> wrote:
> 
> Hi guys,
>  
> I’m working on a solution where I ingest Kafka Records and I need to sink them to another topic using Avro and Schema Registry.
> The problem I’m facing, is that I can’t find a suitable configuration that actually works for me.
>  
> I’m going to explain.
>  
> 	• I have a KafkaSource that consumes basically the initial stream of data.
> 	• I have an Operator that maps the kafka records to Avro Objects (Java POJOs generated using mvn avro plugin, based on .avsc files)
> 	• I register the schemas in Schema Registry using the mvn schema-registry:register plugin/goal (registering the schema type as AVRO.
> 	• I have a FlinkKafkaProducer<GeneratedAvroObject> where I provide a serialization schema of type ConfluentRegistrySerializationSchema.
>  
> My Kafka Properties for the Producer:
>  
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> kafkaProps.put(
>     KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schemaregistry:38081");
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
> kafkaProps.put("auto.register.schemas", false);
> kafkaProps.put("use.latest.version", true);
>  
> As I learned from other tutorials/articles, I need to basically use KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
> This will bring me eventually in the place from KafkaAvroSerializer, where based on how the record actually looks, it will get me the schema, it will go to the schema registry and bring the schema for the needed record, and serialize it before it gets sent.
> The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() method, the keyedSchema is null in my case, but kafkaSchema is not null, and it basically does a ‘pre-serialization’ that is transforming my Record into a byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the Record is already a byte[] and it basically returns back a schema of type “bytes” instead of returning the schema I have for that SpecificRecord. And when it brings the propper schema from the schema registry, it basically fails for not being compatible. Schema {} is not compatible with schema of type “bytes”.
>  
> For more context, this is how my Processor looks at this moment.
>  
> DataStream<ObjectNode> kafkaRecords =
>     env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");
> 
> SingleOutputStreamOperator<AvroObject> producedRecords =
>     kafkaRecords
>         .map(
>             value -> {
>               String kafkaKey = value.get(KEY).asText();
>               String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
>               return Converter.convert(kafkaKey, kafkaRecordJson);
>             })
>         .returns(TypeInformation.of(AvroObject.class));
> 
> AvroSerializationSchema<AvroObject > schema =
>         ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);
> 
> FlinkKafkaProducer< AvroObject > kafkaProducer =
>         new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);
> 
> producedRecords.addSink(kafkaProducer);
> 
> env.execute();
>  
> Exception:
> Caused by: java.io.IOException: Incompatible schema { avro schema here} }with refs [] of type AVRO for schema "bytes".
>  
> PS: If I remove the KafkaAvroSerializer from the producer properties, it works fine, but when I consume the messages, the first message gets consumed but the values from the record are default ones. And the second message throws exception EOFExcetion – could not debug yet exactly the cause. It seems like, when I don’t have the KafkaAvroSerializer, is not actually going to use the schema registry to get the schema back and use that as a serializer, so I definitely need to have that there, but I still think I need to do some more config changes maybe in other places, because it’s definitely not working as expected.
>  
> Thanks a lot!
> I would appreciate at least some points where I could investigate more and if there is someone else that has a similar implementation, maybe some tips and tricks.
>  
> Regards,
> Dan Serb