You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Ghiya, Jay (GE Healthcare)" <Ja...@ge.com> on 2022/05/18 16:21:06 UTC

Kafka Sink Key and Value Avro Schema Usage Issues

Hi Team,

This is regarding Flink Kafka Sink. We have a usecase where we have headers and both key and value as Avro Schema.

Below is the expectation in terms of intuitiveness for avro kafka key and value:

KafkaSink.<ProducerRecord<Key,Value>>builder()
                        .setBootstrapServers(cloudkafkaBrokerAPI)
                        .setRecordSerializer(
                                KafkaRecordSerializationSchema.builder()
                                .setKeySerializationSchema(
                                    ConfluentRegistryAvroSerializationSchema
                                .forSpecific(
                                    key.class,
                                        "Key",
                                        cloudSchemaRegistryURL))
                                .setValueSerializationSchema(
                                                ConfluentRegistryAvroSerializationSchema
                                                        .forSpecific(
                                                            Value.class,"val", cloudSchemaRegistryURL))
                                        .setTopic(outputTopic)
                                        .build())
                        .build();

What I understood currently it does not accept key and value both as avro schemas as part of kafka sink. It only accepts sink.

First I tried to use the deprecated Flink Kafka Producer by implementing KafkaSerializationSchema and supplying properties of avro ser and der via :
cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());


The problem here is I am able to run this example but the schema that gets stored in confluent schema registry is:
{
    "subject": "ddp_out-key",
    "version": 1,
    "id": 1,
    "schema": "\"bytes\""
}

Instead of full schema it has just recognized the whole as bytes. So I am looking for a solution without kafka sink to make it work as of now and is there feature request part of roadmap for adding support
To kafka sink itself for producer record as that would be ideal. The previous operator can send the producer record with key,val and headers and then it can be forwarded ahead.

-Jay
GEHC



Re: Kafka Sink Key and Value Avro Schema Usage Issues

Posted by Peter Schrott <pe...@bluerootlabs.io>.
Hi Ghiy,

I am not quite sure about your actual problem, why the schema is not
generated as expected.

I also needed to work with the Kafka keys in the business logic, therefore
I found a way to deserialize and serialize the key along with the event
itself by overriding KafkaRecord[De]SerializationSchema. I am using Flinks'
new Kafka Source / Sink API. The difference to your requirement is that I
am only using keys from type Array[Byte]. But this could most certainly be
patched.


class KeyedEventSerializationSchema[A <: SpecificRecord:
ClassTag](topic: String, schemaRegistryUrl: String)
    extends KafkaRecordSerializationSchema[KeyedEvent[A]]
    with KafkaContextAware[KeyedEvent[A]] {

  private val eventClass = classTag[A].runtimeClass.asInstanceOf[Class[A]]
  private val valueSchema =
ConfluentRegistryAvroSerializationSchema.forSpecific(eventClass,
eventClass.getCanonicalName, schemaRegistryUrl)
  // TODO maybe you could add the keySchema here accordingly...

  override def serialize(element: KeyedEvent[A], context:
KafkaSinkContext, timestamp: JLong): ProducerRecord[Array[Byte],
Array[Byte]] =
    new ProducerRecord(getTargetTopic(element), element.key,
valueSchema.serialize(element.value))

  override def getTargetTopic(element: KeyedEvent[A]): String = topic
}

final case class KeyedEvent[A <: SpecificRecord](key: Array[Byte], value: A)

val keyedEventSerialization = new
KeyedEventSerializationSchema[A](OutputTopicName, SchemaRegistryUrl)
val kafkaSinkBuilder = KafkaSink.builder[KeyedEvent[A]]()
kafkaSinkBuilder
  .setBootstrapServers(BootstrapServers)
  .setKafkaProducerConfig(ProducerProperties)
  .setRecordSerializer(keyedEventSerialization)
  .build()

Maybe this helps.

Best Peter

On Wed, May 18, 2022 at 7:03 PM Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
wrote:

> Also forgot to attach the information regarding how did I convert the avro
> objects to bytes in the approach that I took with deprecated kafka producer.
>
>
>
>     protected byte[] getValueBytes(Value value)
>
>     {
>
>         DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>(
>
>             Value.getSchema());
>
>         ByteArrayOutputStream valOut = new ByteArrayOutputStream();
>
>         BinaryEncoder valEncoder =
> EncoderFactory.get().binaryEncoder(valOut, null);
>
>
>
>         try {
>
>             valWriter.write(value, valEncoder);
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             valEncoder.flush();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             valOut.close();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>         } catch (Exception e) {
>
>
>
>         }
>
>
>
>         return valOut.toByteArray();
>
>     }
>
>
>
>     protected byte[] getKeyBytes(Key key) {
>
>
>
>         DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>(
>
>             key.getSchema());
>
>         ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
>
>         BinaryEncoder keyEncoder =
> EncoderFactory.get().binaryEncoder(keyOut, null);
>
>
>
>         try {
>
>             keyWriter.write(key, keyEncoder);
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             keyEncoder.flush();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             keyOut.close();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>         } catch (Exception e) {
>
>
>
>         }
>
>
>
>         return keyOut.toByteArray();
>
>     }
>
>
>
>
>
>
>
> *From:* Ghiya, Jay (GE Healthcare)
> *Sent:* 18 May 2022 21:51
> *To:* user@flink.apache.org
> *Cc:* dev@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) <
> Satheeshkumar.Pandiaraj@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.S.kumar@ge.com>
> *Subject:* Kafka Sink Key and Value Avro Schema Usage Issues
>
>
>
> Hi Team,
>
>
>
> This is regarding Flink Kafka Sink. We have a usecase where we have
> headers and both key and value as Avro Schema.
>
>
>
> Below is the expectation in terms of intuitiveness for avro kafka key and
> value:
>
>
>
> KafkaSink.<ProducerRecord<Key,Value>>builder()
>
>                         .setBootstrapServers(cloudkafkaBrokerAPI)
>
>                         .setRecordSerializer(
>
>                                 KafkaRecordSerializationSchema.builder()
>
>                                 .setKeySerializationSchema(
>
>
> ConfluentRegistryAvroSerializationSchema
>
>                                 .forSpecific(
>
>                                     key.class,
>
>                                         "Key",
>
>                                         cloudSchemaRegistryURL))
>
>                                 .setValueSerializationSchema(
>
>
>              ConfluentRegistryAvroSerializationSchema
>
>                                                         .forSpecific(
>
>
> Value.class,"val", cloudSchemaRegistryURL))
>
>                                         .setTopic(outputTopic)
>
>                                         .build())
>
>                         .build();
>
>
>
> What I understood currently it does not accept key and value both as avro
> schemas as part of kafka sink. It only accepts sink.
>
>
>
> First I tried to use the deprecated Flink Kafka Producer by implementing
> KafkaSerializationSchema and supplying properties of avro ser and der via :
>
>
> cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
>
>
> cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
>
>
>
>
>
> The problem here is I am able to run this example but the schema that gets
> stored in confluent schema registry is:
>
> {
>
>     "subject": "ddp_out-key",
>
>     "version": 1,
>
>     "id": 1,
>
>     "schema": "\"bytes\""
>
> }
>
>
>
> Instead of full schema it has just recognized the whole as bytes. So I am
> looking for a solution without kafka sink to make it work as of now and is
> there feature request part of roadmap for adding support
>
> To kafka sink itself for producer record as that would be ideal. The
> previous operator can send the producer record with key,val and headers and
> then it can be forwarded ahead.
>
>
>
> -Jay
>
> GEHC
>
>
>
>
>

RE: Kafka Sink Key and Value Avro Schema Usage Issues

Posted by "Ghiya, Jay (GE Healthcare)" <Ja...@ge.com>.
Also forgot to attach the information regarding how did I convert the avro objects to bytes in the approach that I took with deprecated kafka producer.

    protected byte[] getValueBytes(Value value)
    {
        DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>(
            Value.getSchema());
        ByteArrayOutputStream valOut = new ByteArrayOutputStream();
        BinaryEncoder valEncoder = EncoderFactory.get().binaryEncoder(valOut, null);

        try {
            valWriter.write(value, valEncoder);

            // TODO Auto-generated catch block

            valEncoder.flush();

            // TODO Auto-generated catch block

            valOut.close();

            // TODO Auto-generated catch block

        } catch (Exception e) {

        }

        return valOut.toByteArray();
    }

    protected byte[] getKeyBytes(Key key) {

        DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>(
            key.getSchema());
        ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
        BinaryEncoder keyEncoder = EncoderFactory.get().binaryEncoder(keyOut, null);

        try {
            keyWriter.write(key, keyEncoder);

            // TODO Auto-generated catch block

            keyEncoder.flush();

            // TODO Auto-generated catch block

            keyOut.close();

            // TODO Auto-generated catch block

        } catch (Exception e) {

        }

        return keyOut.toByteArray();
    }



From: Ghiya, Jay (GE Healthcare)
Sent: 18 May 2022 21:51
To: user@flink.apache.org
Cc: dev@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) <Sa...@ge.com>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>
Subject: Kafka Sink Key and Value Avro Schema Usage Issues

Hi Team,

This is regarding Flink Kafka Sink. We have a usecase where we have headers and both key and value as Avro Schema.

Below is the expectation in terms of intuitiveness for avro kafka key and value:

KafkaSink.<ProducerRecord<Key,Value>>builder()
                        .setBootstrapServers(cloudkafkaBrokerAPI)
                        .setRecordSerializer(
                                KafkaRecordSerializationSchema.builder()
                                .setKeySerializationSchema(
                                    ConfluentRegistryAvroSerializationSchema
                                .forSpecific(
                                    key.class,
                                        "Key",
                                        cloudSchemaRegistryURL))
                                .setValueSerializationSchema(
                                                ConfluentRegistryAvroSerializationSchema
                                                        .forSpecific(
                                                            Value.class,"val", cloudSchemaRegistryURL))
                                        .setTopic(outputTopic)
                                        .build())
                        .build();

What I understood currently it does not accept key and value both as avro schemas as part of kafka sink. It only accepts sink.

First I tried to use the deprecated Flink Kafka Producer by implementing KafkaSerializationSchema and supplying properties of avro ser and der via :
cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());


The problem here is I am able to run this example but the schema that gets stored in confluent schema registry is:
{
    "subject": "ddp_out-key",
    "version": 1,
    "id": 1,
    "schema": "\"bytes\""
}

Instead of full schema it has just recognized the whole as bytes. So I am looking for a solution without kafka sink to make it work as of now and is there feature request part of roadmap for adding support
To kafka sink itself for producer record as that would be ideal. The previous operator can send the producer record with key,val and headers and then it can be forwarded ahead.

-Jay
GEHC



RE: Kafka Sink Key and Value Avro Schema Usage Issues

Posted by "Ghiya, Jay (GE Healthcare)" <Ja...@ge.com>.
Also forgot to attach the information regarding how did I convert the avro objects to bytes in the approach that I took with deprecated kafka producer.

    protected byte[] getValueBytes(Value value)
    {
        DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>(
            Value.getSchema());
        ByteArrayOutputStream valOut = new ByteArrayOutputStream();
        BinaryEncoder valEncoder = EncoderFactory.get().binaryEncoder(valOut, null);

        try {
            valWriter.write(value, valEncoder);

            // TODO Auto-generated catch block

            valEncoder.flush();

            // TODO Auto-generated catch block

            valOut.close();

            // TODO Auto-generated catch block

        } catch (Exception e) {

        }

        return valOut.toByteArray();
    }

    protected byte[] getKeyBytes(Key key) {

        DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>(
            key.getSchema());
        ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
        BinaryEncoder keyEncoder = EncoderFactory.get().binaryEncoder(keyOut, null);

        try {
            keyWriter.write(key, keyEncoder);

            // TODO Auto-generated catch block

            keyEncoder.flush();

            // TODO Auto-generated catch block

            keyOut.close();

            // TODO Auto-generated catch block

        } catch (Exception e) {

        }

        return keyOut.toByteArray();
    }



From: Ghiya, Jay (GE Healthcare)
Sent: 18 May 2022 21:51
To: user@flink.apache.org
Cc: dev@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) <Sa...@ge.com>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>
Subject: Kafka Sink Key and Value Avro Schema Usage Issues

Hi Team,

This is regarding Flink Kafka Sink. We have a usecase where we have headers and both key and value as Avro Schema.

Below is the expectation in terms of intuitiveness for avro kafka key and value:

KafkaSink.<ProducerRecord<Key,Value>>builder()
                        .setBootstrapServers(cloudkafkaBrokerAPI)
                        .setRecordSerializer(
                                KafkaRecordSerializationSchema.builder()
                                .setKeySerializationSchema(
                                    ConfluentRegistryAvroSerializationSchema
                                .forSpecific(
                                    key.class,
                                        "Key",
                                        cloudSchemaRegistryURL))
                                .setValueSerializationSchema(
                                                ConfluentRegistryAvroSerializationSchema
                                                        .forSpecific(
                                                            Value.class,"val", cloudSchemaRegistryURL))
                                        .setTopic(outputTopic)
                                        .build())
                        .build();

What I understood currently it does not accept key and value both as avro schemas as part of kafka sink. It only accepts sink.

First I tried to use the deprecated Flink Kafka Producer by implementing KafkaSerializationSchema and supplying properties of avro ser and der via :
cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());


The problem here is I am able to run this example but the schema that gets stored in confluent schema registry is:
{
    "subject": "ddp_out-key",
    "version": 1,
    "id": 1,
    "schema": "\"bytes\""
}

Instead of full schema it has just recognized the whole as bytes. So I am looking for a solution without kafka sink to make it work as of now and is there feature request part of roadmap for adding support
To kafka sink itself for producer record as that would be ideal. The previous operator can send the producer record with key,val and headers and then it can be forwarded ahead.

-Jay
GEHC