You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ismaël Mejía <ie...@gmail.com> on 2020/02/04 17:23:14 UTC

Re: Kafka Avro Schema Registry Support

Support for Confluent Schema Registry was merged into KafkaIO today. You can
test it with tomorrow's snapshots (version 2.20.0-SNAPSHOT) or just when
2.20.0
gets released. Notice that this was already possible, but Alexey took care
of
making this more user friendly because this is (was) a frequently requested
feature by Kafka/Avro users.



On Fri, Sep 28, 2018 at 6:58 PM Raghu Angadi <ra...@google.com> wrote:

> Looks like your producer writing a Avro specfic records.
>
> Can you read the records using bundled console consumer? I think it will
> be simpler for you to get it returning valid records and use the same
> deserializer config with your KafkaIO reader.
>
> On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm <bm...@gmail.com> wrote:
>
>> Hi Raghu,
>>
>> Thanks for the response.  We are now trying with GenericAvroDeserializer
>> but still seeing issues.
>> We have a producer which sends messages to kafka in format
>> <String,GenericRecord>.
>>
>> Below is the code snippet, we have used at Beam KafkaIo.
>>
>>      org.apache.avro.Schema schema = null;
>>         try {
>>             schema = new org.apache.avro.Schema.Parser().parse(new
>> File("Schema path"));
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>         KafkaIO.Read<String, GenericRecord> kafkaIoRead =
>> KafkaIO.<String, GenericRecord>read()
>>
>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>>                 .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>> AvroCoder.of(schema))
>>
>> .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl))
>>                 .withTimestampPolicyFactory((tp, prevWatermark) -> new
>> KafkaCustomTimestampPolicy(maxDelay,
>>                         timestampInfo, prevWatermark));
>>
>> Below is the error seen,
>>
>> Caused by:
>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
>> org.apache.avro.AvroRuntimeException: Not a Specific class: interface
>> org.apache.avro.generic.GenericRecord
>>         at
>> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
>>         at
>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
>>         ... 8 more
>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>> interface org.apache.avro.generic.GenericRecord
>>         at
>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
>>         at
>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>         at
>> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
>>         at
>> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
>>         at
>> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
>>
>>
>> Can you provide some pointers on this.
>>
>>
>> *Thanks & Regards,*
>>
>> *Vishwas *
>>
>>
>>
>> On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <ra...@google.com> wrote:
>>
>>> It is a compilation error due to type mismatch for value type.
>>>
>>> Please match key and value types for KafkaIO reader. I.e. if you have
>>> KafkaIO.<KeyType, ValueType>read().,  'withValueDeserializer()' needs a
>>> class object which extends 'Deserializer<ValueType>'. Since
>>> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType
>>> needs to be Object, instead of String.
>>>
>>> Btw, it might be better to use GenericAvroDeseiralizer or
>>> SpecificAvroDeserializer from the same package.
>>>
>>>
>>> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <bm...@gmail.com> wrote:
>>>
>>>>
>>>> Hi Raghu,
>>>>
>>>> The deserializer is provided by confluent
>>>> *io.confluent.kafka.serializers* package.
>>>>
>>>> When we set valueDeserializer as  KafkaAvroDeserializer.  We are
>>>> getting below error:
>>>>    The method withValueDeserializer(Class<? extends
>>>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not
>>>> applicable for the arguments
>>>>  (Class<KafkaAvroDeserializer>)
>>>>
>>>> From the error, it looks like beam does not support this deserializer.
>>>> Also we wanted to use schemaRegistry from confluent, is this supported
>>>> in Beam ?
>>>>
>>>>
>>>> *Thanks & Regards,*
>>>> *Vishwas *
>>>>
>>>>
>>>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> You can set key/value deserializers :
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101
>>>>> What are the errors you see?
>>>>>
>>>>> Also note that Beam includes AvroCoder for handling Avro records in
>>>>> Beam.
>>>>>
>>>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <
>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a usecase to read data from Kafka serialized with
>>>>>> KafkaAvroSerializer and schema is present in Schema Registry.
>>>>>>
>>>>>> When we are trying to use ValueDeserializer as
>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord,
>>>>>> we are seeing errors.
>>>>>>
>>>>>> Does KafkaIO.read() supports reading from schema registry and using
>>>>>> confluent KafkaAvroSerDe?
>>>>>>
>>>>>> Regards,
>>>>>> Rahul
>>>>>>
>>>>>

Re: Kafka Avro Schema Registry Support

Posted by rahul patwari <ra...@gmail.com>.
Thanks Ismael for the update.
Thanks Alexey for the enhancement.
We will test it with 2.20 release.

On Tue, 4 Feb 2020, 10:53 pm Ismaël Mejía, <ie...@gmail.com> wrote:

> Support for Confluent Schema Registry was merged into KafkaIO today. You
> can
> test it with tomorrow's snapshots (version 2.20.0-SNAPSHOT) or just when
> 2.20.0
> gets released. Notice that this was already possible, but Alexey took care
> of
> making this more user friendly because this is (was) a frequently requested
> feature by Kafka/Avro users.
>
>
>
> On Fri, Sep 28, 2018 at 6:58 PM Raghu Angadi <ra...@google.com> wrote:
>
>> Looks like your producer writing a Avro specfic records.
>>
>> Can you read the records using bundled console consumer? I think it will
>> be simpler for you to get it returning valid records and use the same
>> deserializer config with your KafkaIO reader.
>>
>> On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm <bm...@gmail.com> wrote:
>>
>>> Hi Raghu,
>>>
>>> Thanks for the response.  We are now trying with GenericAvroDeserializer
>>> but still seeing issues.
>>> We have a producer which sends messages to kafka in format
>>> <String,GenericRecord>.
>>>
>>> Below is the code snippet, we have used at Beam KafkaIo.
>>>
>>>      org.apache.avro.Schema schema = null;
>>>         try {
>>>             schema = new org.apache.avro.Schema.Parser().parse(new
>>> File("Schema path"));
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>         KafkaIO.Read<String, GenericRecord> kafkaIoRead =
>>> KafkaIO.<String, GenericRecord>read()
>>>
>>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>>> AvroCoder.of(schema))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl))
>>>                 .withTimestampPolicyFactory((tp, prevWatermark) -> new
>>> KafkaCustomTimestampPolicy(maxDelay,
>>>                         timestampInfo, prevWatermark));
>>>
>>> Below is the error seen,
>>>
>>> Caused by:
>>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
>>> org.apache.avro.AvroRuntimeException: Not a Specific class: interface
>>> org.apache.avro.generic.GenericRecord
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
>>>         at
>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
>>>         ... 8 more
>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>> interface org.apache.avro.generic.GenericRecord
>>>         at
>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
>>>         at
>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>>         at
>>> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
>>>         at
>>> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
>>>         at
>>> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
>>>
>>>
>>> Can you provide some pointers on this.
>>>
>>>
>>> *Thanks & Regards,*
>>>
>>> *Vishwas *
>>>
>>>
>>>
>>> On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <ra...@google.com> wrote:
>>>
>>>> It is a compilation error due to type mismatch for value type.
>>>>
>>>> Please match key and value types for KafkaIO reader. I.e. if you have
>>>> KafkaIO.<KeyType, ValueType>read().,  'withValueDeserializer()' needs a
>>>> class object which extends 'Deserializer<ValueType>'. Since
>>>> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType
>>>> needs to be Object, instead of String.
>>>>
>>>> Btw, it might be better to use GenericAvroDeseiralizer or
>>>> SpecificAvroDeserializer from the same package.
>>>>
>>>>
>>>> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <bm...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi Raghu,
>>>>>
>>>>> The deserializer is provided by confluent
>>>>> *io.confluent.kafka.serializers* package.
>>>>>
>>>>> When we set valueDeserializer as  KafkaAvroDeserializer.  We are
>>>>> getting below error:
>>>>>    The method withValueDeserializer(Class<? extends
>>>>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not
>>>>> applicable for the arguments
>>>>>  (Class<KafkaAvroDeserializer>)
>>>>>
>>>>> From the error, it looks like beam does not support this deserializer.
>>>>> Also we wanted to use schemaRegistry from confluent, is this supported
>>>>> in Beam ?
>>>>>
>>>>>
>>>>> *Thanks & Regards,*
>>>>> *Vishwas *
>>>>>
>>>>>
>>>>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <ra...@google.com>
>>>>> wrote:
>>>>>
>>>>>> You can set key/value deserializers :
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101
>>>>>> What are the errors you see?
>>>>>>
>>>>>> Also note that Beam includes AvroCoder for handling Avro records in
>>>>>> Beam.
>>>>>>
>>>>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <
>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have a usecase to read data from Kafka serialized with
>>>>>>> KafkaAvroSerializer and schema is present in Schema Registry.
>>>>>>>
>>>>>>> When we are trying to use ValueDeserializer as
>>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord,
>>>>>>> we are seeing errors.
>>>>>>>
>>>>>>> Does KafkaIO.read() supports reading from schema registry and using
>>>>>>> confluent KafkaAvroSerDe?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Rahul
>>>>>>>
>>>>>>