You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Akhlaq Malik <in...@imalik8088.de> on 2020/05/10 21:13:11 UTC

serialize Kafka messages with confluent registry under Flink 1.9.1

is it possible to publish message to Kafka serialized with KafkaAvroSerializer by Confluent. I’m using Flink 1.9.1 have saw that some development is going on newer version of flink-avro (1.11.0) but I’m stick to the version.

I would like to use the newly introduced KafkaSerializationSchema for serializing the message to Confluent schema-registry and Kakfa.

Here I have currently a class that is converting a class type T to avro but I want to use the confluent serialization.

```

public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);

final private String topic;

public KafkaMessageSerialization(String topic) {
    this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final Schema schema = event.getSchema();
    final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
    final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        writer.write(event, binEncoder);
        binEncoder.flush();
    } catch (final Exception e) {
        LOG.error("serialization error", e);
        throw new RuntimeException(e);
    }

    return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}
```

The usage is quite convenient .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))