You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tao Li <ta...@zillow.com> on 2020/12/08 19:59:03 UTC

Quick question about KafkaIO.Write

Hi Beam community,

I got a quick question about withValueSerializer() method of KafkaIO.Write<K,V> class: https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html

The withValueSerializer method does not support passing in a serializer provider. The problem with lacking that functionality is that I cannot use Kafka schema registry to fetch the schema for serialization.

However at the same time, the KafkaIO.Read<K,V> withKeyDeserializer method supports specifying a deserializer provider: https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-

Is this a gap for KafkaIO.Write<K,V> or is it by design? Is there a workaround to specify the schema registry info for KafkaIO.Write<K,V>?

Thanks so much!

Re: Quick question about KafkaIO.Write

Posted by Tao Li <ta...@zillow.com>.
@Alexey Romanenko<ma...@gmail.com> thanks so much for your suggestions.

Actually I found the below code seems to work.

KafkaIO
        .<Void, GenericRecord>write()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topicName)
        .withValueSerializer((Class) KafkaAvroSerializer.class)
        .withProducerConfigUpdates(ImmutableMap.of("schema.registry.url", schemaRegistryUrl))

Thanks and I hope there will be more great improvements coming in future, as you mentioned šŸ˜Š

From: Alexey Romanenko <ar...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, December 9, 2020 at 9:08 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Quick question about KafkaIO.Write<K,V>

AFAIR, DeserializerProvider in KafkaIO was added along with adding a Confluent Schema Registry's support in KafkaIO.Read to provide a universal way to use different Deserializers (itā€™s Local and ConfluentSchemaRegistry for the moment).

Regarding Write part, I believe we can do the similar refactoring. Feel free to provide a patch, we can help with review/testing/advices.

For now, just an idea of workaround (I didnā€™t test it) - you need to fetch your schema from Schema Registry in advance by yourself with SchemaRegistryClient to create an Avro record for write (e.g. GenericRecord) and then set KafkaAvroSerializer as ValueSerializer and specify ā€œschema.registry.urlā€ in producer properties.


On 8 Dec 2020, at 20:59, Tao Li <ta...@zillow.com>> wrote:

Hi Beam community,

I got a quick question about withValueSerializer() method of KafkaIO.Write<K,V> class:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Write.html&data=04%7C01%7Ctaol%40zillow.com%7C8e4ef48dfd1943bf9b5108d89c65082f%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637431305030435395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=6%2FEKkQe0domyHoqg%2FTe6bZPZBFypiQtmKRPgyKj0w1o%3D&reserved=0>

The withValueSerializer method does not support passing in a serializer provider. The problem with lacking that functionality is that I cannot use Kafka schema registry to fetch the schema for serialization.

However at the same time, the KafkaIO.Read<K,V> withKeyDeserializer method supports specifying a deserializer provider:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Read.html%23withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-&data=04%7C01%7Ctaol%40zillow.com%7C8e4ef48dfd1943bf9b5108d89c65082f%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637431305030435395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OXx6Gtd00OsIrw9yMP1kflAwpXzveb%2FM6IoggnkpWkk%3D&reserved=0>

Is this a gap for KafkaIO.Write<K,V> or is it by design? Is there a workaround to specify the schema registry info for KafkaIO.Write<K,V>?

Thanks so much!


Re: Quick question about KafkaIO.Write

Posted by Alexey Romanenko <ar...@gmail.com>.
AFAIR, DeserializerProvider in KafkaIO was added along with adding a Confluent Schema Registry's support in KafkaIO.Read to provide a universal way to use different Deserializers (itā€™s Local and ConfluentSchemaRegistry for the moment).

Regarding Write part, I believe we can do the similar refactoring. Feel free to provide a patch, we can help with review/testing/advices. 

For now, just an idea of workaround (I didnā€™t test it) - you need to fetch your schema from Schema Registry in advance by yourself with SchemaRegistryClient to create an Avro record for write (e.g. GenericRecord) and then set KafkaAvroSerializer as ValueSerializer and specify ā€œschema.registry.urlā€ in producer properties. 

> On 8 Dec 2020, at 20:59, Tao Li <ta...@zillow.com> wrote:
> 
> Hi Beam community,
>  
> I got a quick question about withValueSerializer() method of KafkaIO.Write<K,V> class:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html>
>  
> The withValueSerializer method does not support passing in a serializer provider. The problem with lacking that functionality is that I cannot use Kafka schema registry to fetch the schema for serialization.
>  
> However at the same time, the KafkaIO.Read<K,V> withKeyDeserializer method supports specifying a deserializer provider:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider- <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider->
>  
> Is this a gap for KafkaIO.Write<K,V> or is it by design? Is there a workaround to specify the schema registry info for KafkaIO.Write<K,V>?
>  
> Thanks so much!