You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Matt Casters <ma...@neo4j.com> on 2022/02/08 13:16:28 UTC

KafkaIO.write and Avro

Dear Beams,

When sending Avro values to Kafka, say GenericRecord, you typically specify
option value.serializer as
"io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
bunch of other options for authentication and so on verifies the schema
stored in the Avro record with a schema registry.   Unfortunately, I
couldn't figure out how to pass this serializer class to KafkaIO.write() as
it's not acceptable to the withValueSerializer() method.

For KafkaIO.read() we made a specific provision in the form of
class ConfluentSchemaRegistryDeserializer but it doesn't look like we
covered the producer side of Avro values yet.

I'd be happy to dive into the code to add proper support for a Confluent
schema registry in KafkaIO.write() but I was just wondering if there was
something I might have overlooked.  It's hard to find samples or
documentation on producing Avro messages with Beam.

Thanks in advance,

Matt

Re: KafkaIO.write and Avro

Posted by Brian Hulette <bh...@google.com>.
Oh yes I didn't mean to disagree with you Matt. I just meant to point out
that what we have is close to supporting the normal way but is not quite
there, perhaps frustratingly so, since our preferred way is more strict
about types than the normal way.

Brian

On Wed, Feb 9, 2022 at 11:57 PM Matt Casters <ma...@neo4j.com> wrote:

> Hi Brian,
>
> My point mainly is that KafkaIO (and others as well) tend to be
> restrictive towards the API user.
> In this case for example, errors are thrown at runtime if you don't set
> the serializers using the Beam API. [1]
> Instead of helping the inexperienced Kafka user, which is great, this
> blocked me from getting the job done.
>
> Hope that this clarifies things.
> Cheers,
> Matt
>
> [1]
> https://github.com/apache/beam/blob/8b213c617ef8cf3a077bb0002b6b0fec8e85cb05/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2500
>
>
>
> On Wed, Feb 9, 2022 at 8:05 PM Brian Hulette <bh...@google.com> wrote:
>
>> If the normal way is just to set the producer config we do have an API
>> for that, e.g. withProducerConfigUpdates [1]. It's just not well-defined
>> what takes precedence, and in fact it looks like we will just overwrite any
>> serde configuration specified in this way:
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withProducerConfigUpdates-java.util.Map-
>> [2]
>> https://github.com/apache/beam/blob/df907de8519e6a23bb6b016ff8593f103e739e61/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>>
>> On Wed, Feb 9, 2022 at 10:36 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> +Ismael
>>>
>>> Doing it in “normal” way, especially for Kafka, may require some
>>> additional non-evident steps (well, of course it can be documented). So,
>>> I’d prefer to have a more user-friendly API around it, like we have for
>>> reading Avro messages with a schema stored in Confluent Schema Registry,
>>> which actually just extends a current API by adding a new method
>>> "withValueDeserializer(DeserializerProvider<V>)” and provides a
>>> new ConfluentSchemaRegistryDeserializerProvider class that incapsulates all
>>> business logic inside. So, I'd suggest to follow the same way for KafkaIO
>>> write part.
>>>
>>> Any thoughts on this?
>>>
>>> PS: For those (me including), who are curious why KafkaIO has coders and
>>> serdes in the same time, this Jira [1] can be interesting to read (just
>>> found it recently)
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-1573
>>>
>>> —
>>> Alexey
>>>
>>>
>>> On 9 Feb 2022, at 17:15, Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>> Good point. Doing things the "normal" way for users of the storage
>>> system is a good on-ramp. Conversely, having a "normal Beam" way is good
>>> for people who use Beam more than Kafka. Can we have both easily?
>>>
>>> Kenn
>>>
>>> On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <ma...@neo4j.com>
>>> wrote:
>>>
>>>> Of-course IMO it would be fine as well to not force developers to use
>>>> withKeySerializer() / withValueSerializer() in the first place.
>>>> This way you could use the standard way of configuring the Kafka
>>>> serializer classes using properties as per the Kafka Consumer/Producer
>>>> documentation.
>>>>
>>>> Just an idea.
>>>> Matt
>>>>
>>>> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>>> Just a quick type check: is it the case that a Serializer<Object> is
>>>>> expected to be able to properly serde any subclass of object? More
>>>>> generally that any Serializer<? super V> should be able to properly serde
>>>>> V? Typically this isn't the case. Not saying we shouldn't make the proposed
>>>>> change, but it could result in surprises.
>>>>>
>>>>> Another possibility based on experience with coders, I would highlight
>>>>> three types of serde that could apply to Serializer as well as it does to
>>>>> Coder:
>>>>>
>>>>> 1. handles just a single type (VarIntCoder, etc)
>>>>> 2. lossy/converts concrete types because it is allowed (ListCoder
>>>>> works for any list, but does *not* restore the original concrete
>>>>> subclass)
>>>>> 3. generic/tagging (SerializableCoder which restores the concrete
>>>>> subclass)
>>>>>
>>>>> The API in KafkaIO is right for types 1 and 2 but too strict for type
>>>>> 3. But the new API is great for type 3, potentially dangerous for type 2
>>>>> and 1 (but mostly type 1 it will be irrelevant).
>>>>>
>>>>> We could have a separate entrypoint for type 3, like
>>>>> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
>>>>> if you call this one you have to pass a Serializer that tags the concrete
>>>>> subclass and restores it. Often, the only relevant type will be
>>>>> Serializer<Object> so we could even make that the parameter type.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bh...@google.com>
>>>>> wrote:
>>>>>
>>>>>> The issue here is that KafkaAvroSerializer implements
>>>>>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to
>>>>>> erase the type to force it. I think Moritz's suggestion is actually to
>>>>>> update the signature here [2] to make the type parameter `? super V`, so
>>>>>> that a Serializer<Object> will be acceptable. That change would be
>>>>>> preferable to updating the docs.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>>>>>
>>>>>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks both, that's great -
>>>>>>>
>>>>>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>>>>>
>>>>>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>>>>>
>>>>>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>>>>>> KafkaAvroSerializer.class)
>>>>>>>>
>>>>>>>> ... which simply doesn't even compile for me.
>>>>>>>>
>>>>>>>>  incompatible types:
>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>>>>>> be converted to java.lang.Class<? extends
>>>>>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>>>>>
>>>>>>>> It sort of puts you on the wrong footing hence my question.
>>>>>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>>>>>> KafkaIO.
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>>>>>
>>>>>>>> Easier to figure out was AvroCoder.of(schema) but it might make
>>>>>>>> sense to document that in the same context as well.
>>>>>>>>
>>>>>>>> Thanks again!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Matt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Just having a quick look, it looks like the respective interface
>>>>>>>>> in KafkaIO should rather look like this to support KafkaAvroSerializer,
>>>>>>>>> which is a Serializer<Object>:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<?
>>>>>>>>> super V>> valueSerializer)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thoughts?
>>>>>>>>>
>>>>>>>>> Cheers, Moritz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>>>>>> *To: *dev@beam.apache.org <de...@beam.apache.org>,
>>>>>>>>> matt.casters@neo4j.com <ma...@neo4j.com>
>>>>>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>>>>>
>>>>>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>>>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>>>>>> ZjQcmQRYFpfptBannerStart
>>>>>>>>>
>>>>>>>>> This Message Is From an External Sender
>>>>>>>>>
>>>>>>>>> This message came from outside your organization.
>>>>>>>>>
>>>>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>>>>
>>>>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>>>>
>>>>>>>>> Hi Matt,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Unfortunately, the types don’t play well when using
>>>>>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>>>>>
>>>>>>>>> The following will work:
>>>>>>>>>
>>>>>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This seems to be the cause of repeated confusion, so probably
>>>>>>>>> worth improving the user experience here!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Moritz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From: *Matt Casters <ma...@neo4j.com>
>>>>>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>>>>>> *To: *Beam Development List <de...@beam.apache.org>
>>>>>>>>> *Subject: *KafkaIO.write and Avro
>>>>>>>>>
>>>>>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord,
>>>>>>>>> you typically specify option value.serializer as
>>>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>>>>>
>>>>>>>>> This Message Is From an External Sender
>>>>>>>>>
>>>>>>>>> This message came from outside your organization.
>>>>>>>>>
>>>>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>>>>
>>>>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>>>>
>>>>>>>>> Dear Beams,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> When sending Avro values to Kafka, say GenericRecord, you
>>>>>>>>> typically specify option value.serializer as
>>>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>>>>> bunch of other options for authentication and so on verifies the schema
>>>>>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>>>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>>>>>> covered the producer side of Avro values yet.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'd be happy to dive into the code to add proper support for a
>>>>>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>>>>>> there was something I might have overlooked.  It's hard to find samples or
>>>>>>>>> documentation on producing Avro messages with Beam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks in advance,
>>>>>>>>>
>>>>>>>>> Matt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *As a recipient of an email from Talend, your contact personal
>>>>>>>>> data will be on our systems. Please see our privacy notice (updated August
>>>>>>>>> 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *As a recipient of an email from Talend, your contact personal
>>>>>>>>> data will be on our systems. Please see our privacy notice (updated August
>>>>>>>>> 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Neo4j Chief Solutions Architect
>>>>>>>> *✉   *matt.casters@neo4j.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> Neo4j Chief Solutions Architect
>>>> *✉   *matt.casters@neo4j.com
>>>>
>>>>
>>>>
>>>>
>>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.casters@neo4j.com
>
>
>
>

Re: KafkaIO.write and Avro

Posted by Matt Casters <ma...@neo4j.com>.
Hi Brian,

My point mainly is that KafkaIO (and others as well) tend to be restrictive
towards the API user.
In this case for example, errors are thrown at runtime if you don't set the
serializers using the Beam API. [1]
Instead of helping the inexperienced Kafka user, which is great, this
blocked me from getting the job done.

Hope that this clarifies things.
Cheers,
Matt

[1]
https://github.com/apache/beam/blob/8b213c617ef8cf3a077bb0002b6b0fec8e85cb05/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2500



On Wed, Feb 9, 2022 at 8:05 PM Brian Hulette <bh...@google.com> wrote:

> If the normal way is just to set the producer config we do have an API for
> that, e.g. withProducerConfigUpdates [1]. It's just not well-defined what
> takes precedence, and in fact it looks like we will just overwrite any
> serde configuration specified in this way:
>
> [1]
> https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withProducerConfigUpdates-java.util.Map-
> [2]
> https://github.com/apache/beam/blob/df907de8519e6a23bb6b016ff8593f103e739e61/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>
> On Wed, Feb 9, 2022 at 10:36 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> +Ismael
>>
>> Doing it in “normal” way, especially for Kafka, may require some
>> additional non-evident steps (well, of course it can be documented). So,
>> I’d prefer to have a more user-friendly API around it, like we have for
>> reading Avro messages with a schema stored in Confluent Schema Registry,
>> which actually just extends a current API by adding a new method
>> "withValueDeserializer(DeserializerProvider<V>)” and provides a
>> new ConfluentSchemaRegistryDeserializerProvider class that incapsulates all
>> business logic inside. So, I'd suggest to follow the same way for KafkaIO
>> write part.
>>
>> Any thoughts on this?
>>
>> PS: For those (me including), who are curious why KafkaIO has coders and
>> serdes in the same time, this Jira [1] can be interesting to read (just
>> found it recently)
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-1573
>>
>> —
>> Alexey
>>
>>
>> On 9 Feb 2022, at 17:15, Kenneth Knowles <ke...@apache.org> wrote:
>>
>> Good point. Doing things the "normal" way for users of the storage system
>> is a good on-ramp. Conversely, having a "normal Beam" way is good for
>> people who use Beam more than Kafka. Can we have both easily?
>>
>> Kenn
>>
>> On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <ma...@neo4j.com>
>> wrote:
>>
>>> Of-course IMO it would be fine as well to not force developers to use
>>> withKeySerializer() / withValueSerializer() in the first place.
>>> This way you could use the standard way of configuring the Kafka
>>> serializer classes using properties as per the Kafka Consumer/Producer
>>> documentation.
>>>
>>> Just an idea.
>>> Matt
>>>
>>> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Just a quick type check: is it the case that a Serializer<Object> is
>>>> expected to be able to properly serde any subclass of object? More
>>>> generally that any Serializer<? super V> should be able to properly serde
>>>> V? Typically this isn't the case. Not saying we shouldn't make the proposed
>>>> change, but it could result in surprises.
>>>>
>>>> Another possibility based on experience with coders, I would highlight
>>>> three types of serde that could apply to Serializer as well as it does to
>>>> Coder:
>>>>
>>>> 1. handles just a single type (VarIntCoder, etc)
>>>> 2. lossy/converts concrete types because it is allowed (ListCoder works
>>>> for any list, but does *not* restore the original concrete subclass)
>>>> 3. generic/tagging (SerializableCoder which restores the concrete
>>>> subclass)
>>>>
>>>> The API in KafkaIO is right for types 1 and 2 but too strict for type
>>>> 3. But the new API is great for type 3, potentially dangerous for type 2
>>>> and 1 (but mostly type 1 it will be irrelevant).
>>>>
>>>> We could have a separate entrypoint for type 3, like
>>>> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
>>>> if you call this one you have to pass a Serializer that tags the concrete
>>>> subclass and restores it. Often, the only relevant type will be
>>>> Serializer<Object> so we could even make that the parameter type.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bh...@google.com>
>>>> wrote:
>>>>
>>>>> The issue here is that KafkaAvroSerializer implements
>>>>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to
>>>>> erase the type to force it. I think Moritz's suggestion is actually to
>>>>> update the signature here [2] to make the type parameter `? super V`, so
>>>>> that a Serializer<Object> will be acceptable. That change would be
>>>>> preferable to updating the docs.
>>>>>
>>>>> [1]
>>>>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>>>>
>>>>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks both, that's great -
>>>>>>
>>>>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>>>>
>>>>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>>>>
>>>>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>>>>> KafkaAvroSerializer.class)
>>>>>>>
>>>>>>> ... which simply doesn't even compile for me.
>>>>>>>
>>>>>>>  incompatible types:
>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>>>>> be converted to java.lang.Class<? extends
>>>>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>>>>
>>>>>>> It sort of puts you on the wrong footing hence my question.
>>>>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>>>>> KafkaIO.
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>>>>
>>>>>>> Easier to figure out was AvroCoder.of(schema) but it might make
>>>>>>> sense to document that in the same context as well.
>>>>>>>
>>>>>>> Thanks again!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Matt
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>>>>>
>>>>>>>> Just having a quick look, it looks like the respective interface in
>>>>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>>>>>> is a Serializer<Object>:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<?
>>>>>>>> super V>> valueSerializer)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>> Cheers, Moritz
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>>>>> *To: *dev@beam.apache.org <de...@beam.apache.org>,
>>>>>>>> matt.casters@neo4j.com <ma...@neo4j.com>
>>>>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>>>>
>>>>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>>>>> ZjQcmQRYFpfptBannerStart
>>>>>>>>
>>>>>>>> This Message Is From an External Sender
>>>>>>>>
>>>>>>>> This message came from outside your organization.
>>>>>>>>
>>>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>>>
>>>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>>>
>>>>>>>> Hi Matt,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Unfortunately, the types don’t play well when using
>>>>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>>>>
>>>>>>>> The following will work:
>>>>>>>>
>>>>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> This seems to be the cause of repeated confusion, so probably worth
>>>>>>>> improving the user experience here!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Moritz
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Matt Casters <ma...@neo4j.com>
>>>>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>>>>> *To: *Beam Development List <de...@beam.apache.org>
>>>>>>>> *Subject: *KafkaIO.write and Avro
>>>>>>>>
>>>>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord,
>>>>>>>> you typically specify option value.serializer as
>>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>>>>
>>>>>>>> This Message Is From an External Sender
>>>>>>>>
>>>>>>>> This message came from outside your organization.
>>>>>>>>
>>>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>>>
>>>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>>>
>>>>>>>> Dear Beams,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>>>>>> specify option value.serializer as
>>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>>>> bunch of other options for authentication and so on verifies the schema
>>>>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>>>>> covered the producer side of Avro values yet.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'd be happy to dive into the code to add proper support for a
>>>>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>>>>> there was something I might have overlooked.  It's hard to find samples or
>>>>>>>> documentation on producing Avro messages with Beam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks in advance,
>>>>>>>>
>>>>>>>> Matt
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Neo4j Chief Solutions Architect
>>>>>>> *✉   *matt.casters@neo4j.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *matt.casters@neo4j.com
>>>
>>>
>>>
>>>
>>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com

Re: KafkaIO.write and Avro

Posted by Brian Hulette <bh...@google.com>.
If the normal way is just to set the producer config we do have an API for
that, e.g. withProducerConfigUpdates [1]. It's just not well-defined what
takes precedence, and in fact it looks like we will just overwrite any
serde configuration specified in this way:

[1]
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withProducerConfigUpdates-java.util.Map-
[2]
https://github.com/apache/beam/blob/df907de8519e6a23bb6b016ff8593f103e739e61/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711

On Wed, Feb 9, 2022 at 10:36 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> +Ismael
>
> Doing it in “normal” way, especially for Kafka, may require some
> additional non-evident steps (well, of course it can be documented). So,
> I’d prefer to have a more user-friendly API around it, like we have for
> reading Avro messages with a schema stored in Confluent Schema Registry,
> which actually just extends a current API by adding a new method
> "withValueDeserializer(DeserializerProvider<V>)” and provides a
> new ConfluentSchemaRegistryDeserializerProvider class that incapsulates all
> business logic inside. So, I'd suggest to follow the same way for KafkaIO
> write part.
>
> Any thoughts on this?
>
> PS: For those (me including), who are curious why KafkaIO has coders and
> serdes in the same time, this Jira [1] can be interesting to read (just
> found it recently)
>
> [1] https://issues.apache.org/jira/browse/BEAM-1573
>
> —
> Alexey
>
>
> On 9 Feb 2022, at 17:15, Kenneth Knowles <ke...@apache.org> wrote:
>
> Good point. Doing things the "normal" way for users of the storage system
> is a good on-ramp. Conversely, having a "normal Beam" way is good for
> people who use Beam more than Kafka. Can we have both easily?
>
> Kenn
>
> On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <ma...@neo4j.com>
> wrote:
>
>> Of-course IMO it would be fine as well to not force developers to use
>> withKeySerializer() / withValueSerializer() in the first place.
>> This way you could use the standard way of configuring the Kafka
>> serializer classes using properties as per the Kafka Consumer/Producer
>> documentation.
>>
>> Just an idea.
>> Matt
>>
>> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Just a quick type check: is it the case that a Serializer<Object> is
>>> expected to be able to properly serde any subclass of object? More
>>> generally that any Serializer<? super V> should be able to properly serde
>>> V? Typically this isn't the case. Not saying we shouldn't make the proposed
>>> change, but it could result in surprises.
>>>
>>> Another possibility based on experience with coders, I would highlight
>>> three types of serde that could apply to Serializer as well as it does to
>>> Coder:
>>>
>>> 1. handles just a single type (VarIntCoder, etc)
>>> 2. lossy/converts concrete types because it is allowed (ListCoder works
>>> for any list, but does *not* restore the original concrete subclass)
>>> 3. generic/tagging (SerializableCoder which restores the concrete
>>> subclass)
>>>
>>> The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
>>> But the new API is great for type 3, potentially dangerous for type 2 and 1
>>> (but mostly type 1 it will be irrelevant).
>>>
>>> We could have a separate entrypoint for type 3, like
>>> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
>>> if you call this one you have to pass a Serializer that tags the concrete
>>> subclass and restores it. Often, the only relevant type will be
>>> Serializer<Object> so we could even make that the parameter type.
>>>
>>> Kenn
>>>
>>> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> The issue here is that KafkaAvroSerializer implements
>>>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to
>>>> erase the type to force it. I think Moritz's suggestion is actually to
>>>> update the signature here [2] to make the type parameter `? super V`, so
>>>> that a Serializer<Object> will be acceptable. That change would be
>>>> preferable to updating the docs.
>>>>
>>>> [1]
>>>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>>>> [2]
>>>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>>>
>>>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com>
>>>> wrote:
>>>>
>>>>> Thanks both, that's great -
>>>>>
>>>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>>>
>>>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>>>
>>>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>>>> KafkaAvroSerializer.class)
>>>>>>
>>>>>> ... which simply doesn't even compile for me.
>>>>>>
>>>>>>  incompatible types:
>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>>>> be converted to java.lang.Class<? extends
>>>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>>>
>>>>>> It sort of puts you on the wrong footing hence my question.
>>>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>>>> KafkaIO.
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>>>
>>>>>> Easier to figure out was AvroCoder.of(schema) but it might make sense
>>>>>> to document that in the same context as well.
>>>>>>
>>>>>> Thanks again!
>>>>>>
>>>>>> Cheers,
>>>>>> Matt
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>>>>
>>>>>>> Just having a quick look, it looks like the respective interface in
>>>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>>>>> is a Serializer<Object>:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<?
>>>>>>> super V>> valueSerializer)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>> Cheers, Moritz
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>>>> *To: *dev@beam.apache.org <de...@beam.apache.org>,
>>>>>>> matt.casters@neo4j.com <ma...@neo4j.com>
>>>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>>>
>>>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>>>> ZjQcmQRYFpfptBannerStart
>>>>>>>
>>>>>>> This Message Is From an External Sender
>>>>>>>
>>>>>>> This message came from outside your organization.
>>>>>>>
>>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>>
>>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>>
>>>>>>> Hi Matt,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Unfortunately, the types don’t play well when using
>>>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>>>
>>>>>>> The following will work:
>>>>>>>
>>>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> This seems to be the cause of repeated confusion, so probably worth
>>>>>>> improving the user experience here!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Moritz
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *Matt Casters <ma...@neo4j.com>
>>>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>>>> *To: *Beam Development List <de...@beam.apache.org>
>>>>>>> *Subject: *KafkaIO.write and Avro
>>>>>>>
>>>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord,
>>>>>>> you typically specify option value.serializer as
>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>>>
>>>>>>> This Message Is From an External Sender
>>>>>>>
>>>>>>> This message came from outside your organization.
>>>>>>>
>>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>>
>>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>>
>>>>>>> Dear Beams,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>>>>> specify option value.serializer as
>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>>> bunch of other options for authentication and so on verifies the schema
>>>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>>>> covered the producer side of Avro values yet.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I'd be happy to dive into the code to add proper support for a
>>>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>>>> there was something I might have overlooked.  It's hard to find samples or
>>>>>>> documentation on producing Avro messages with Beam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>>
>>>>>>> Matt
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Neo4j Chief Solutions Architect
>>>>>> *✉   *matt.casters@neo4j.com
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>
>> --
>> Neo4j Chief Solutions Architect
>> *✉   *matt.casters@neo4j.com
>>
>>
>>
>>
>

Re: KafkaIO.write and Avro

Posted by Alexey Romanenko <ar...@gmail.com>.
+Ismael

Doing it in “normal” way, especially for Kafka, may require some additional non-evident steps (well, of course it can be documented). So, I’d prefer to have a more user-friendly API around it, like we have for reading Avro messages with a schema stored in Confluent Schema Registry, which actually just extends a current API by adding a new method "withValueDeserializer(DeserializerProvider<V>)” and provides a new ConfluentSchemaRegistryDeserializerProvider class that incapsulates all business logic inside. So, I'd suggest to follow the same way for KafkaIO write part.

Any thoughts on this?

PS: For those (me including), who are curious why KafkaIO has coders and serdes in the same time, this Jira [1] can be interesting to read (just found it recently)  

[1] https://issues.apache.org/jira/browse/BEAM-1573 <https://issues.apache.org/jira/browse/BEAM-1573>

—
Alexey


> On 9 Feb 2022, at 17:15, Kenneth Knowles <ke...@apache.org> wrote:
> 
> Good point. Doing things the "normal" way for users of the storage system is a good on-ramp. Conversely, having a "normal Beam" way is good for people who use Beam more than Kafka. Can we have both easily?
> 
> Kenn
> 
> On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>> wrote:
> Of-course IMO it would be fine as well to not force developers to use withKeySerializer() / withValueSerializer() in the first place.
> This way you could use the standard way of configuring the Kafka serializer classes using properties as per the Kafka Consumer/Producer documentation.
> 
> Just an idea.
> Matt
> 
> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
> Just a quick type check: is it the case that a Serializer<Object> is expected to be able to properly serde any subclass of object? More generally that any Serializer<? super V> should be able to properly serde V? Typically this isn't the case. Not saying we shouldn't make the proposed change, but it could result in surprises.
> 
> Another possibility based on experience with coders, I would highlight three types of serde that could apply to Serializer as well as it does to Coder:
> 
> 1. handles just a single type (VarIntCoder, etc)
> 2. lossy/converts concrete types because it is allowed (ListCoder works for any list, but does not restore the original concrete subclass)
> 3. generic/tagging (SerializableCoder which restores the concrete subclass)
> 
> The API in KafkaIO is right for types 1 and 2 but too strict for type 3. But the new API is great for type 3, potentially dangerous for type 2 and 1 (but mostly type 1 it will be irrelevant).
> 
> We could have a separate entrypoint for type 3, like .withGenericValueCoder(Serializer<? super V>) that makes it very clear that if you call this one you have to pass a Serializer that tags the concrete subclass and restores it. Often, the only relevant type will be Serializer<Object> so we could even make that the parameter type.
> 
> Kenn
> 
> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhulette@google.com <ma...@google.com>> wrote:
> The issue here is that KafkaAvroSerializer implements Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to erase the type to force it. I think Moritz's suggestion is actually to update the signature here [2] to make the type parameter `? super V`, so that a Serializer<Object> will be acceptable. That change would be preferable to updating the docs.
> 
> [1] https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27 <https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27>
> [2] https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379 <https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379>
> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachinag@google.com <ma...@google.com>> wrote:
> Thanks both, that's great - 
> 
> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>> wrote:
> Thanks a lot Moritz.  Your suggestion worked immediately.
> 
> You sort of get on the wrong track since my favorite IDE suggests: 
> 
> .withValueSerializer((Class<? extends Serializer<GenericRecord>>) KafkaAvroSerializer.class)
> 
> ... which simply doesn't even compile for me. 
> 
>  incompatible types: java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot be converted to java.lang.Class<? extends org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
> 
> It sort of puts you on the wrong footing hence my question.
> If you don't mind I'll simply create a PR to amend the Javadoc for KafkaIO.
> 
> https://issues.apache.org/jira/browse/BEAM-13854 <https://issues.apache.org/jira/browse/BEAM-13854>
> 
> Easier to figure out was AvroCoder.of(schema) but it might make sense to document that in the same context as well.  
> 
> Thanks again!
> 
> Cheers,
> Matt
> 
> 
> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mmack@talend.com <ma...@talend.com>> wrote:
> Just having a quick look, it looks like the respective interface in KafkaIO should rather look like this to support KafkaAvroSerializer, which is a Serializer<Object>:
> 
>  
> 
> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super V>> valueSerializer)
> 
>  
> 
> Thoughts?
> 
> Cheers, Moritz
> 
>  
> 
> From: Moritz Mack <mmack@talend.com <ma...@talend.com>>
> Date: Tuesday, 8. February 2022 at 15:55
> To: dev@beam.apache.org <ma...@beam.apache.org> <dev@beam.apache.org <ma...@beam.apache.org>>, matt.casters@neo4j.com <ma...@neo4j.com> <matt.casters@neo4j.com <ma...@neo4j.com>>
> Subject: Re: KafkaIO.write and Avro
> 
> Hi Matt, Unfortunately, the types don’t play well when using KafkaAvroSerializer. It currently requires a cast :/ The following will work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ZjQcmQRYFpfptBannerStart
> 
> This Message Is From an External Sender
> 
> This message came from outside your organization.
> 
> Exercise caution when opening attachments or clicking any links. 
> 
> ZjQcmQRYFpfptBannerEnd
> 
> Hi Matt,
> 
>  
> 
> Unfortunately, the types don’t play well when using KafkaAvroSerializer. It currently requires a cast :/
> 
> The following will work:
> 
> write.withValueSerializer((Class)KafkaAvroSerializer.class))
> 
>  
> 
> This seems to be the cause of repeated confusion, so probably worth improving the user experience here!
> 
>  
> 
> Cheers,
> 
> Moritz
> 
>  
> 
> From: Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>>
> Date: Tuesday, 8. February 2022 at 14:17
> To: Beam Development List <dev@beam.apache.org <ma...@beam.apache.org>>
> Subject: KafkaIO.write and Avro
> 
> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch of other options for authentication ZjQcmQRYFpfptBannerStart
> 
> This Message Is From an External Sender
> 
> This message came from outside your organization.
> 
> Exercise caution when opening attachments or clicking any links. 
> 
> ZjQcmQRYFpfptBannerEnd
> 
> Dear Beams,
> 
>  
> 
> When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch of other options for authentication and so on verifies the schema stored in the Avro record with a schema registry.   Unfortunately, I couldn't figure out how to pass this serializer class to KafkaIO.write() as it's not acceptable to the withValueSerializer() method.
> 
>  
> 
> For KafkaIO.read() we made a specific provision in the form of class ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the producer side of Avro values yet.
> 
>   
> 
> I'd be happy to dive into the code to add proper support for a Confluent schema registry in KafkaIO.write() but I was just wondering if there was something I might have overlooked.  It's hard to find samples or documentation on producing Avro messages with Beam.
> 
>  
> 
> Thanks in advance,
> 
> Matt
> 
>  
> 
> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>
>  
> 
> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
> 
> 
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
> 
> 
> 


Re: KafkaIO.write and Avro

Posted by Kenneth Knowles <ke...@apache.org>.
Good point. Doing things the "normal" way for users of the storage system
is a good on-ramp. Conversely, having a "normal Beam" way is good for
people who use Beam more than Kafka. Can we have both easily?

Kenn

On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <ma...@neo4j.com> wrote:

> Of-course IMO it would be fine as well to not force developers to use
> withKeySerializer() / withValueSerializer() in the first place.
> This way you could use the standard way of configuring the Kafka
> serializer classes using properties as per the Kafka Consumer/Producer
> documentation.
>
> Just an idea.
> Matt
>
> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Just a quick type check: is it the case that a Serializer<Object> is
>> expected to be able to properly serde any subclass of object? More
>> generally that any Serializer<? super V> should be able to properly serde
>> V? Typically this isn't the case. Not saying we shouldn't make the proposed
>> change, but it could result in surprises.
>>
>> Another possibility based on experience with coders, I would highlight
>> three types of serde that could apply to Serializer as well as it does to
>> Coder:
>>
>> 1. handles just a single type (VarIntCoder, etc)
>> 2. lossy/converts concrete types because it is allowed (ListCoder works
>> for any list, but does *not* restore the original concrete subclass)
>> 3. generic/tagging (SerializableCoder which restores the concrete
>> subclass)
>>
>> The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
>> But the new API is great for type 3, potentially dangerous for type 2 and 1
>> (but mostly type 1 it will be irrelevant).
>>
>> We could have a separate entrypoint for type 3, like
>> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
>> if you call this one you have to pass a Serializer that tags the concrete
>> subclass and restores it. Often, the only relevant type will be
>> Serializer<Object> so we could even make that the parameter type.
>>
>> Kenn
>>
>> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bh...@google.com>
>> wrote:
>>
>>> The issue here is that KafkaAvroSerializer implements
>>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to
>>> erase the type to force it. I think Moritz's suggestion is actually to
>>> update the signature here [2] to make the type parameter `? super V`, so
>>> that a Serializer<Object> will be acceptable. That change would be
>>> preferable to updating the docs.
>>>
>>> [1]
>>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>>> [2]
>>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>>
>>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com>
>>> wrote:
>>>
>>>> Thanks both, that's great -
>>>>
>>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
>>>> wrote:
>>>>
>>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>>
>>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>>
>>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>>> KafkaAvroSerializer.class)
>>>>>
>>>>> ... which simply doesn't even compile for me.
>>>>>
>>>>>  incompatible types:
>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>>> be converted to java.lang.Class<? extends
>>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>>
>>>>> It sort of puts you on the wrong footing hence my question.
>>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>>> KafkaIO.
>>>>>
>>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>>
>>>>> Easier to figure out was AvroCoder.of(schema) but it might make sense
>>>>> to document that in the same context as well.
>>>>>
>>>>> Thanks again!
>>>>>
>>>>> Cheers,
>>>>> Matt
>>>>>
>>>>>
>>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>>>
>>>>>> Just having a quick look, it looks like the respective interface in
>>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>>>> is a Serializer<Object>:
>>>>>>
>>>>>>
>>>>>>
>>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>>>>>> V>> valueSerializer)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> Cheers, Moritz
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>>> *To: *dev@beam.apache.org <de...@beam.apache.org>,
>>>>>> matt.casters@neo4j.com <ma...@neo4j.com>
>>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>>
>>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>>> ZjQcmQRYFpfptBannerStart
>>>>>>
>>>>>> This Message Is From an External Sender
>>>>>>
>>>>>> This message came from outside your organization.
>>>>>>
>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>
>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>
>>>>>> Hi Matt,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Unfortunately, the types don’t play well when using
>>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>>
>>>>>> The following will work:
>>>>>>
>>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>>
>>>>>>
>>>>>>
>>>>>> This seems to be the cause of repeated confusion, so probably worth
>>>>>> improving the user experience here!
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Moritz
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Matt Casters <ma...@neo4j.com>
>>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>>> *To: *Beam Development List <de...@beam.apache.org>
>>>>>> *Subject: *KafkaIO.write and Avro
>>>>>>
>>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>>>>> typically specify option value.serializer as
>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>>
>>>>>> This Message Is From an External Sender
>>>>>>
>>>>>> This message came from outside your organization.
>>>>>>
>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>
>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>
>>>>>> Dear Beams,
>>>>>>
>>>>>>
>>>>>>
>>>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>>>> specify option value.serializer as
>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>> bunch of other options for authentication and so on verifies the schema
>>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>>
>>>>>>
>>>>>>
>>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>>> covered the producer side of Avro values yet.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'd be happy to dive into the code to add proper support for a
>>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>>> there was something I might have overlooked.  It's hard to find samples or
>>>>>> documentation on producing Avro messages with Beam.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Matt
>>>>>>
>>>>>>
>>>>>>
>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Neo4j Chief Solutions Architect
>>>>> *✉   *matt.casters@neo4j.com
>>>>>
>>>>>
>>>>>
>>>>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.casters@neo4j.com
>
>
>
>

Re: KafkaIO.write and Avro

Posted by Matt Casters <ma...@neo4j.com>.
Of-course IMO it would be fine as well to not force developers to use
withKeySerializer() / withValueSerializer() in the first place.
This way you could use the standard way of configuring the Kafka serializer
classes using properties as per the Kafka Consumer/Producer documentation.

Just an idea.
Matt

On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <ke...@apache.org> wrote:

> Just a quick type check: is it the case that a Serializer<Object> is
> expected to be able to properly serde any subclass of object? More
> generally that any Serializer<? super V> should be able to properly serde
> V? Typically this isn't the case. Not saying we shouldn't make the proposed
> change, but it could result in surprises.
>
> Another possibility based on experience with coders, I would highlight
> three types of serde that could apply to Serializer as well as it does to
> Coder:
>
> 1. handles just a single type (VarIntCoder, etc)
> 2. lossy/converts concrete types because it is allowed (ListCoder works
> for any list, but does *not* restore the original concrete subclass)
> 3. generic/tagging (SerializableCoder which restores the concrete subclass)
>
> The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
> But the new API is great for type 3, potentially dangerous for type 2 and 1
> (but mostly type 1 it will be irrelevant).
>
> We could have a separate entrypoint for type 3, like
> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
> if you call this one you have to pass a Serializer that tags the concrete
> subclass and restores it. Often, the only relevant type will be
> Serializer<Object> so we could even make that the parameter type.
>
> Kenn
>
> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bh...@google.com> wrote:
>
>> The issue here is that KafkaAvroSerializer implements Serializer<Object>,
>> and not Serializer<GenericRecord> [1]. So you need to erase the type to
>> force it. I think Moritz's suggestion is actually to update the signature
>> here [2] to make the type parameter `? super V`, so that a
>> Serializer<Object> will be acceptable. That change would be preferable to
>> updating the docs.
>>
>> [1]
>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>> [2]
>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>
>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com>
>> wrote:
>>
>>> Thanks both, that's great -
>>>
>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
>>> wrote:
>>>
>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>
>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>
>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>> KafkaAvroSerializer.class)
>>>>
>>>> ... which simply doesn't even compile for me.
>>>>
>>>>  incompatible types:
>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>> be converted to java.lang.Class<? extends
>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>
>>>> It sort of puts you on the wrong footing hence my question.
>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>> KafkaIO.
>>>>
>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>
>>>> Easier to figure out was AvroCoder.of(schema) but it might make sense
>>>> to document that in the same context as well.
>>>>
>>>> Thanks again!
>>>>
>>>> Cheers,
>>>> Matt
>>>>
>>>>
>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>>
>>>>> Just having a quick look, it looks like the respective interface in
>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>>> is a Serializer<Object>:
>>>>>
>>>>>
>>>>>
>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>>>>> V>> valueSerializer)
>>>>>
>>>>>
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Cheers, Moritz
>>>>>
>>>>>
>>>>>
>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>> *To: *dev@beam.apache.org <de...@beam.apache.org>,
>>>>> matt.casters@neo4j.com <ma...@neo4j.com>
>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>
>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>> ZjQcmQRYFpfptBannerStart
>>>>>
>>>>> This Message Is From an External Sender
>>>>>
>>>>> This message came from outside your organization.
>>>>>
>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>
>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>
>>>>> Hi Matt,
>>>>>
>>>>>
>>>>>
>>>>> Unfortunately, the types don’t play well when using
>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>
>>>>> The following will work:
>>>>>
>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>
>>>>>
>>>>>
>>>>> This seems to be the cause of repeated confusion, so probably worth
>>>>> improving the user experience here!
>>>>>
>>>>>
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Moritz
>>>>>
>>>>>
>>>>>
>>>>> *From: *Matt Casters <ma...@neo4j.com>
>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>> *To: *Beam Development List <de...@beam.apache.org>
>>>>> *Subject: *KafkaIO.write and Avro
>>>>>
>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>>>> typically specify option value.serializer as
>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>
>>>>> This Message Is From an External Sender
>>>>>
>>>>> This message came from outside your organization.
>>>>>
>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>
>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>
>>>>> Dear Beams,
>>>>>
>>>>>
>>>>>
>>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>>> specify option value.serializer as
>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>> bunch of other options for authentication and so on verifies the schema
>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>
>>>>>
>>>>>
>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>> covered the producer side of Avro values yet.
>>>>>
>>>>>
>>>>>
>>>>> I'd be happy to dive into the code to add proper support for a
>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>> there was something I might have overlooked.  It's hard to find samples or
>>>>> documentation on producing Avro messages with Beam.
>>>>>
>>>>>
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> Matt
>>>>>
>>>>>
>>>>>
>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>
>>>>>
>>>>>
>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Neo4j Chief Solutions Architect
>>>> *✉   *matt.casters@neo4j.com
>>>>
>>>>
>>>>
>>>>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com

Re: KafkaIO.write and Avro

Posted by Kenneth Knowles <ke...@apache.org>.
Just a quick type check: is it the case that a Serializer<Object> is
expected to be able to properly serde any subclass of object? More
generally that any Serializer<? super V> should be able to properly serde
V? Typically this isn't the case. Not saying we shouldn't make the proposed
change, but it could result in surprises.

Another possibility based on experience with coders, I would highlight
three types of serde that could apply to Serializer as well as it does to
Coder:

1. handles just a single type (VarIntCoder, etc)
2. lossy/converts concrete types because it is allowed (ListCoder works for
any list, but does *not* restore the original concrete subclass)
3. generic/tagging (SerializableCoder which restores the concrete subclass)

The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
But the new API is great for type 3, potentially dangerous for type 2 and 1
(but mostly type 1 it will be irrelevant).

We could have a separate entrypoint for type 3, like
.withGenericValueCoder(Serializer<? super V>) that makes it very clear that
if you call this one you have to pass a Serializer that tags the concrete
subclass and restores it. Often, the only relevant type will be
Serializer<Object> so we could even make that the parameter type.

Kenn

On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bh...@google.com> wrote:

> The issue here is that KafkaAvroSerializer implements Serializer<Object>,
> and not Serializer<GenericRecord> [1]. So you need to erase the type to
> force it. I think Moritz's suggestion is actually to update the signature
> here [2] to make the type parameter `? super V`, so that a
> Serializer<Object> will be acceptable. That change would be preferable to
> updating the docs.
>
> [1]
> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
> [2]
> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>
> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com> wrote:
>
>> Thanks both, that's great -
>>
>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
>> wrote:
>>
>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>
>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>
>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>> KafkaAvroSerializer.class)
>>>
>>> ... which simply doesn't even compile for me.
>>>
>>>  incompatible types:
>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>> be converted to java.lang.Class<? extends
>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>
>>> It sort of puts you on the wrong footing hence my question.
>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>> KafkaIO.
>>>
>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>
>>> Easier to figure out was AvroCoder.of(schema) but it might make sense to
>>> document that in the same context as well.
>>>
>>> Thanks again!
>>>
>>> Cheers,
>>> Matt
>>>
>>>
>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>
>>>> Just having a quick look, it looks like the respective interface in
>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>> is a Serializer<Object>:
>>>>
>>>>
>>>>
>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>>>> V>> valueSerializer)
>>>>
>>>>
>>>>
>>>> Thoughts?
>>>>
>>>> Cheers, Moritz
>>>>
>>>>
>>>>
>>>> *From: *Moritz Mack <mm...@talend.com>
>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>> *To: *dev@beam.apache.org <de...@beam.apache.org>, matt.casters@neo4j.com
>>>> <ma...@neo4j.com>
>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>
>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>> ZjQcmQRYFpfptBannerStart
>>>>
>>>> This Message Is From an External Sender
>>>>
>>>> This message came from outside your organization.
>>>>
>>>> Exercise caution when opening attachments or clicking any links.
>>>>
>>>> ZjQcmQRYFpfptBannerEnd
>>>>
>>>> Hi Matt,
>>>>
>>>>
>>>>
>>>> Unfortunately, the types don’t play well when using
>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>
>>>> The following will work:
>>>>
>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>
>>>>
>>>>
>>>> This seems to be the cause of repeated confusion, so probably worth
>>>> improving the user experience here!
>>>>
>>>>
>>>>
>>>> Cheers,
>>>>
>>>> Moritz
>>>>
>>>>
>>>>
>>>> *From: *Matt Casters <ma...@neo4j.com>
>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>> *To: *Beam Development List <de...@beam.apache.org>
>>>> *Subject: *KafkaIO.write and Avro
>>>>
>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>>> typically specify option value.serializer as
>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>
>>>> This Message Is From an External Sender
>>>>
>>>> This message came from outside your organization.
>>>>
>>>> Exercise caution when opening attachments or clicking any links.
>>>>
>>>> ZjQcmQRYFpfptBannerEnd
>>>>
>>>> Dear Beams,
>>>>
>>>>
>>>>
>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>> specify option value.serializer as
>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>> bunch of other options for authentication and so on verifies the schema
>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>> it's not acceptable to the withValueSerializer() method.
>>>>
>>>>
>>>>
>>>> For KafkaIO.read() we made a specific provision in the form of
>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>> covered the producer side of Avro values yet.
>>>>
>>>>
>>>>
>>>> I'd be happy to dive into the code to add proper support for a
>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>> there was something I might have overlooked.  It's hard to find samples or
>>>> documentation on producing Avro messages with Beam.
>>>>
>>>>
>>>>
>>>> Thanks in advance,
>>>>
>>>> Matt
>>>>
>>>>
>>>>
>>>> *As a recipient of an email from Talend, your contact personal data
>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>
>>>>
>>>>
>>>> *As a recipient of an email from Talend, your contact personal data
>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>
>>>>
>>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *matt.casters@neo4j.com
>>>
>>>
>>>
>>>

Re: KafkaIO.write and Avro

Posted by Brian Hulette <bh...@google.com>.
The issue here is that KafkaAvroSerializer implements Serializer<Object>,
and not Serializer<GenericRecord> [1]. So you need to erase the type to
force it. I think Moritz's suggestion is actually to update the signature
here [2] to make the type parameter `? super V`, so that a
Serializer<Object> will be acceptable. That change would be preferable to
updating the docs.

[1]
https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
[2]
https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379

On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sa...@google.com> wrote:

> Thanks both, that's great -
>
> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com>
> wrote:
>
>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>
>> You sort of get on the wrong track since my favorite IDE suggests:
>>
>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>> KafkaAvroSerializer.class)
>>
>> ... which simply doesn't even compile for me.
>>
>>  incompatible types:
>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>> be converted to java.lang.Class<? extends
>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>
>> It sort of puts you on the wrong footing hence my question.
>> If you don't mind I'll simply create a PR to amend the Javadoc for
>> KafkaIO.
>>
>> https://issues.apache.org/jira/browse/BEAM-13854
>>
>> Easier to figure out was AvroCoder.of(schema) but it might make sense to
>> document that in the same context as well.
>>
>> Thanks again!
>>
>> Cheers,
>> Matt
>>
>>
>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>
>>> Just having a quick look, it looks like the respective interface in
>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>> is a Serializer<Object>:
>>>
>>>
>>>
>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>>> V>> valueSerializer)
>>>
>>>
>>>
>>> Thoughts?
>>>
>>> Cheers, Moritz
>>>
>>>
>>>
>>> *From: *Moritz Mack <mm...@talend.com>
>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>> *To: *dev@beam.apache.org <de...@beam.apache.org>, matt.casters@neo4j.com
>>> <ma...@neo4j.com>
>>> *Subject: *Re: KafkaIO.write and Avro
>>>
>>> Hi Matt, Unfortunately, the types don’t play well when using
>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>> ZjQcmQRYFpfptBannerStart
>>>
>>> This Message Is From an External Sender
>>>
>>> This message came from outside your organization.
>>>
>>> Exercise caution when opening attachments or clicking any links.
>>>
>>> ZjQcmQRYFpfptBannerEnd
>>>
>>> Hi Matt,
>>>
>>>
>>>
>>> Unfortunately, the types don’t play well when using KafkaAvroSerializer.
>>> It currently requires a cast :/
>>>
>>> The following will work:
>>>
>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>
>>>
>>>
>>> This seems to be the cause of repeated confusion, so probably worth
>>> improving the user experience here!
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Moritz
>>>
>>>
>>>
>>> *From: *Matt Casters <ma...@neo4j.com>
>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>> *To: *Beam Development List <de...@beam.apache.org>
>>> *Subject: *KafkaIO.write and Avro
>>>
>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>> typically specify option value.serializer as
>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>
>>> This Message Is From an External Sender
>>>
>>> This message came from outside your organization.
>>>
>>> Exercise caution when opening attachments or clicking any links.
>>>
>>> ZjQcmQRYFpfptBannerEnd
>>>
>>> Dear Beams,
>>>
>>>
>>>
>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>> specify option value.serializer as
>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>> bunch of other options for authentication and so on verifies the schema
>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>> it's not acceptable to the withValueSerializer() method.
>>>
>>>
>>>
>>> For KafkaIO.read() we made a specific provision in the form of
>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>> covered the producer side of Avro values yet.
>>>
>>>
>>>
>>> I'd be happy to dive into the code to add proper support for a Confluent
>>> schema registry in KafkaIO.write() but I was just wondering if there was
>>> something I might have overlooked.  It's hard to find samples or
>>> documentation on producing Avro messages with Beam.
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>> Matt
>>>
>>>
>>>
>>> *As a recipient of an email from Talend, your contact personal data will
>>> be on our systems. Please see our privacy notice (updated August 2020) at
>>> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>
>>>
>>>
>>> *As a recipient of an email from Talend, your contact personal data will
>>> be on our systems. Please see our privacy notice (updated August 2020) at
>>> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>
>>>
>>>
>>
>> --
>> Neo4j Chief Solutions Architect
>> *✉   *matt.casters@neo4j.com
>>
>>
>>
>>

Re: KafkaIO.write and Avro

Posted by Sachin Agarwal <sa...@google.com>.
Thanks both, that's great -

On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <ma...@neo4j.com> wrote:

> Thanks a lot Moritz.  Your suggestion worked immediately.
>
> You sort of get on the wrong track since my favorite IDE suggests:
>
> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
> KafkaAvroSerializer.class)
>
> ... which simply doesn't even compile for me.
>
>  incompatible types:
> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
> be converted to java.lang.Class<? extends
> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>
> It sort of puts you on the wrong footing hence my question.
> If you don't mind I'll simply create a PR to amend the Javadoc for KafkaIO.
>
> https://issues.apache.org/jira/browse/BEAM-13854
>
> Easier to figure out was AvroCoder.of(schema) but it might make sense to
> document that in the same context as well.
>
> Thanks again!
>
> Cheers,
> Matt
>
>
> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>
>> Just having a quick look, it looks like the respective interface in
>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>> is a Serializer<Object>:
>>
>>
>>
>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>> V>> valueSerializer)
>>
>>
>>
>> Thoughts?
>>
>> Cheers, Moritz
>>
>>
>>
>> *From: *Moritz Mack <mm...@talend.com>
>> *Date: *Tuesday, 8. February 2022 at 15:55
>> *To: *dev@beam.apache.org <de...@beam.apache.org>, matt.casters@neo4j.com <
>> matt.casters@neo4j.com>
>> *Subject: *Re: KafkaIO.write and Avro
>>
>> Hi Matt, Unfortunately, the types don’t play well when using
>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>> ZjQcmQRYFpfptBannerStart
>>
>> This Message Is From an External Sender
>>
>> This message came from outside your organization.
>>
>> Exercise caution when opening attachments or clicking any links.
>>
>> ZjQcmQRYFpfptBannerEnd
>>
>> Hi Matt,
>>
>>
>>
>> Unfortunately, the types don’t play well when using KafkaAvroSerializer.
>> It currently requires a cast :/
>>
>> The following will work:
>>
>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>
>>
>>
>> This seems to be the cause of repeated confusion, so probably worth
>> improving the user experience here!
>>
>>
>>
>> Cheers,
>>
>> Moritz
>>
>>
>>
>> *From: *Matt Casters <ma...@neo4j.com>
>> *Date: *Tuesday, 8. February 2022 at 14:17
>> *To: *Beam Development List <de...@beam.apache.org>
>> *Subject: *KafkaIO.write and Avro
>>
>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>> typically specify option value.serializer as
>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>
>> This Message Is From an External Sender
>>
>> This message came from outside your organization.
>>
>> Exercise caution when opening attachments or clicking any links.
>>
>> ZjQcmQRYFpfptBannerEnd
>>
>> Dear Beams,
>>
>>
>>
>> When sending Avro values to Kafka, say GenericRecord, you typically
>> specify option value.serializer as
>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>> bunch of other options for authentication and so on verifies the schema
>> stored in the Avro record with a schema registry.   Unfortunately, I
>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>> it's not acceptable to the withValueSerializer() method.
>>
>>
>>
>> For KafkaIO.read() we made a specific provision in the form of
>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>> covered the producer side of Avro values yet.
>>
>>
>>
>> I'd be happy to dive into the code to add proper support for a Confluent
>> schema registry in KafkaIO.write() but I was just wondering if there was
>> something I might have overlooked.  It's hard to find samples or
>> documentation on producing Avro messages with Beam.
>>
>>
>>
>> Thanks in advance,
>>
>> Matt
>>
>>
>>
>> *As a recipient of an email from Talend, your contact personal data will
>> be on our systems. Please see our privacy notice (updated August 2020) at
>> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>
>>
>>
>> *As a recipient of an email from Talend, your contact personal data will
>> be on our systems. Please see our privacy notice (updated August 2020) at
>> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>
>>
>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.casters@neo4j.com
>
>
>
>

Re: KafkaIO.write and Avro

Posted by Matt Casters <ma...@neo4j.com>.
Thanks a lot Moritz.  Your suggestion worked immediately.

You sort of get on the wrong track since my favorite IDE suggests:

.withValueSerializer((Class<? extends Serializer<GenericRecord>>)
KafkaAvroSerializer.class)

... which simply doesn't even compile for me.

 incompatible types:
java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
be converted to java.lang.Class<? extends
org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>

It sort of puts you on the wrong footing hence my question.
If you don't mind I'll simply create a PR to amend the Javadoc for KafkaIO.

https://issues.apache.org/jira/browse/BEAM-13854

Easier to figure out was AvroCoder.of(schema) but it might make sense to
document that in the same context as well.

Thanks again!

Cheers,
Matt


On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:

> Just having a quick look, it looks like the respective interface in
> KafkaIO should rather look like this to support KafkaAvroSerializer, which
> is a Serializer<Object>:
>
>
>
> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
> V>> valueSerializer)
>
>
>
> Thoughts?
>
> Cheers, Moritz
>
>
>
> *From: *Moritz Mack <mm...@talend.com>
> *Date: *Tuesday, 8. February 2022 at 15:55
> *To: *dev@beam.apache.org <de...@beam.apache.org>, matt.casters@neo4j.com <
> matt.casters@neo4j.com>
> *Subject: *Re: KafkaIO.write and Avro
>
> Hi Matt, Unfortunately, the types don’t play well when using
> KafkaAvroSerializer. It currently requires a cast :/ The following will
> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
> ZjQcmQRYFpfptBannerStart
>
> This Message Is From an External Sender
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi Matt,
>
>
>
> Unfortunately, the types don’t play well when using KafkaAvroSerializer.
> It currently requires a cast :/
>
> The following will work:
>
> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>
>
>
> This seems to be the cause of repeated confusion, so probably worth
> improving the user experience here!
>
>
>
> Cheers,
>
> Moritz
>
>
>
> *From: *Matt Casters <ma...@neo4j.com>
> *Date: *Tuesday, 8. February 2022 at 14:17
> *To: *Beam Development List <de...@beam.apache.org>
> *Subject: *KafkaIO.write and Avro
>
> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
> typically specify option value.serializer as
> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>
> This Message Is From an External Sender
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Dear Beams,
>
>
>
> When sending Avro values to Kafka, say GenericRecord, you typically
> specify option value.serializer as
> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
> bunch of other options for authentication and so on verifies the schema
> stored in the Avro record with a schema registry.   Unfortunately, I
> couldn't figure out how to pass this serializer class to KafkaIO.write() as
> it's not acceptable to the withValueSerializer() method.
>
>
>
> For KafkaIO.read() we made a specific provision in the form of
> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
> covered the producer side of Avro values yet.
>
>
>
> I'd be happy to dive into the code to add proper support for a Confluent
> schema registry in KafkaIO.write() but I was just wondering if there was
> something I might have overlooked.  It's hard to find samples or
> documentation on producing Avro messages with Beam.
>
>
>
> Thanks in advance,
>
> Matt
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice (updated August 2020) at
> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice (updated August 2020) at
> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>
>
>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com

Re: KafkaIO.write and Avro

Posted by Moritz Mack <mm...@talend.com>.
Just having a quick look, it looks like the respective interface in KafkaIO should rather look like this to support KafkaAvroSerializer, which is a Serializer<Object>:

public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super V>> valueSerializer)

Thoughts?
Cheers, Moritz

From: Moritz Mack <mm...@talend.com>
Date: Tuesday, 8. February 2022 at 15:55
To: dev@beam.apache.org <de...@beam.apache.org>, matt.casters@neo4j.com <ma...@neo4j.com>
Subject: Re: KafkaIO.write and Avro
Hi Matt, Unfortunately, the types don’t play well when using KafkaAvroSerializer. It currently requires a cast :/ The following will work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Matt,

Unfortunately, the types don’t play well when using KafkaAvroSerializer. It currently requires a cast :/
The following will work:
write.withValueSerializer((Class)KafkaAvroSerializer.class))

This seems to be the cause of repeated confusion, so probably worth improving the user experience here!

Cheers,
Moritz

From: Matt Casters <ma...@neo4j.com>
Date: Tuesday, 8. February 2022 at 14:17
To: Beam Development List <de...@beam.apache.org>
Subject: KafkaIO.write and Avro
Dear Beams, When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch of other options for authentication ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Dear Beams,

When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch of other options for authentication and so on verifies the schema stored in the Avro record with a schema registry.   Unfortunately, I couldn't figure out how to pass this serializer class to KafkaIO.write() as it's not acceptable to the withValueSerializer() method.

For KafkaIO.read() we made a specific provision in the form of class ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the producer side of Avro values yet.

I'd be happy to dive into the code to add proper support for a Confluent schema registry in KafkaIO.write() but I was just wondering if there was something I might have overlooked.  It's hard to find samples or documentation on producing Avro messages with Beam.

Thanks in advance,

Matt


As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>


As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>



Re: KafkaIO.write and Avro

Posted by Moritz Mack <mm...@talend.com>.
Hi Matt,

Unfortunately, the types don’t play well when using KafkaAvroSerializer. It currently requires a cast :/
The following will work:
write.withValueSerializer((Class)KafkaAvroSerializer.class))

This seems to be the cause of repeated confusion, so probably worth improving the user experience here!

Cheers,
Moritz

From: Matt Casters <ma...@neo4j.com>
Date: Tuesday, 8. February 2022 at 14:17
To: Beam Development List <de...@beam.apache.org>
Subject: KafkaIO.write and Avro
Dear Beams, When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch of other options for authentication ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Dear Beams,

When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch of other options for authentication and so on verifies the schema stored in the Avro record with a schema registry.   Unfortunately, I couldn't figure out how to pass this serializer class to KafkaIO.write() as it's not acceptable to the withValueSerializer() method.

For KafkaIO.read() we made a specific provision in the form of class ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the producer side of Avro values yet.

I'd be happy to dive into the code to add proper support for a Confluent schema registry in KafkaIO.write() but I was just wondering if there was something I might have overlooked.  It's hard to find samples or documentation on producing Avro messages with Beam.

Thanks in advance,

Matt


As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>



Re: KafkaIO.write and Avro

Posted by Matt Casters <ma...@neo4j.com>.
So yes, reading the generic records with a consumer worked great. It's
really convenient to have a way of handling both the coder and the
deserializer at once.

To test I hooked KafkaIO up to a free Confluent Cloud service with schema
registry. Reading works great and once I have my next fixes ready for that
wretched Neo4jIO the generic solution to all of this will go into
apache/hop master.  Maybe it can serve as a sample of sorts.

The problem is really on the write/producer side.  Back on the subject of
that coder, and I'm looking at the crazy things folks will do with Apache
Hop, the ability to specify a coder for class GenericRow
(AvroCoder.of(schema)) in a pipeline works only if you're only sending to
one topic with one KafkaIO.write instance with a single schema. This system
would fall apart if you would want to write to multiple topics in the same
pipeline.

So perhaps that scenario would make the case for having a
ConfluentSchemaRegistrySerializer facility.

Cheers,

Matt

Op wo 9 feb. 2022 19:18 schreef Alexey Romanenko <ar...@gmail.com>:

>
> On 8 Feb 2022, at 14:16, Matt Casters <ma...@neo4j.com> wrote:
>
> For KafkaIO.read() we made a specific provision in the form of
> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
> covered the producer side of Avro values yet.
>
>
> Talking about read from Kafka with Avro and Confluent Schema Registry,
> have you tried to use an API that KafkaIO provides [1] using
> DeserializerProvider?
>
> It would be something like this (a code snippet from KafkaIO Javadoc):
>
> KafkaIO.<Long, GenericRecord>read()
>       .withBootstrapServers("broker_1:9092,broker_2:9092")
>       .withTopic("my_topic")
>       .withKeyDeserializer(LongDeserializer.class)
>       // Use Confluent Schema Registry, specify schema registry URL and value subject
>       .withValueDeserializer(
>           ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value")
>
> I think we can add the similar API for write part as well to make it more
> easy-to-use for users.
>
>
>   I'd be happy to dive into the code to add proper support for a Confluent
> schema registry in KafkaIO.write() but I was just wondering if there was
> something I might have overlooked.  It's hard to find samples or
> documentation on producing Avro messages with Beam.
>
>
> I agree that we have a good field for improvement here but, tbh, KafkaIO
> Javadoc contains a dedicated section for that [2] (see “Use Avro schema
> with Confluent Schema Registry”).
>
> —
> Alexey
>
>
> [1]
> https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-
> [2]
> https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>
>
>
>
>

Re: KafkaIO.write and Avro

Posted by Alexey Romanenko <ar...@gmail.com>.
> On 8 Feb 2022, at 14:16, Matt Casters <ma...@neo4j.com> wrote:
> 
> For KafkaIO.read() we made a specific provision in the form of class ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the producer side of Avro values yet.

Talking about read from Kafka with Avro and Confluent Schema Registry, have you tried to use an API that KafkaIO provides [1] using DeserializerProvider?

It would be something like this (a code snippet from KafkaIO Javadoc):

KafkaIO.<Long, GenericRecord>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")
      .withKeyDeserializer(LongDeserializer.class)
      // Use Confluent Schema Registry, specify schema registry URL and value subject
      .withValueDeserializer(
          ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value")
I think we can add the similar API for write part as well to make it more easy-to-use for users.


>   I'd be happy to dive into the code to add proper support for a Confluent schema registry in KafkaIO.write() but I was just wondering if there was something I might have overlooked.  It's hard to find samples or documentation on producing Avro messages with Beam.

I agree that we have a good field for improvement here but, tbh, KafkaIO Javadoc contains a dedicated section for that [2] (see “Use Avro schema with Confluent Schema Registry”). 

—
Alexey


[1] https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider- <https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider->
[2] https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html <https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>