You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lourens Naude <lo...@shopify.com> on 2020/08/21 15:20:02 UTC

Suggestion to let KafkaIO support the deserializer API with headers

Hi everyone,

We bumped into an API issue with the deserializer called on constructing
KafaRecord instances in the KafkaIO module.

I wanted to float this past the mailing list for discussion first before
exploring further.

The callsite referenced: KafkaIO only calls the deserializer with the
simplified API that does not include Kafka record headers (even though they
are available to pass as an argument):
https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203

Our SerDes implementaton relies on Kafka Headers support and it was added
to Kafka records via KIP as a means to include metadata cleanly and not
abuse keys or values for such purposes.

It is also a valid Deserializer API as per the official Kafka interface:

*
https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
* It delegates to the simplified version as it's default implementation
(which requires a formal implementation) in
https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
* The default behaviour is thus backwards compatible, with a preference for
the header specific API

We've used the custom SerDes without issues in a complex Connect and
Streams pipeline, but bumped into this API divergence of not preferring the
deserializer API with headers as the primary deserializer mechanism.

The same API used elsewhere.

* It's the default for the stock Java consumer:
https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
(header enabled calls simplified API)
* Ditto Kafka Connect:
https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
* And Kafka Streams:
https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66

Any thoughts on the proposed change with the additional headers argument
passed on deserialization?

Best,
Lourens

Re: Suggestion to let KafkaIO support the deserializer API with headers

Posted by Lourens Naude <lo...@shopify.com>.
Thanks Luke,

I took a stab at this in https://issues.apache.org/jira/browse/BEAM-10865 -
also outlined extensively avenues explored with the range of Kafka APIs to
support and how Record headers and then eventually the
(Extended)Deserializer APIs evolved. Tested with kafka-clients 1.0.0
through latest

Best,
Lourens

On Fri, Aug 21, 2020 at 5:06 PM Luke Cwik <lc...@google.com> wrote:

> Sounds good.
>
> Note that you'll also want to update ReadFromKafkaDoFn[1] and provide
> tests that cover both to make sure we don't regress and stop providing
> headers.
>
> 1:
> https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309
>
> On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude <lo...@shopify.com>
> wrote:
>
>> Hi everyone,
>>
>> We bumped into an API issue with the deserializer called on constructing
>> KafaRecord instances in the KafkaIO module.
>>
>> I wanted to float this past the mailing list for discussion first before
>> exploring further.
>>
>> The callsite referenced: KafkaIO only calls the deserializer with the
>> simplified API that does not include Kafka record headers (even though they
>> are available to pass as an argument):
>> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203
>>
>> Our SerDes implementaton relies on Kafka Headers support and it was added
>> to Kafka records via KIP as a means to include metadata cleanly and not
>> abuse keys or values for such purposes.
>>
>> It is also a valid Deserializer API as per the official Kafka interface:
>>
>> *
>> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
>> * It delegates to the simplified version as it's default implementation
>> (which requires a formal implementation) in
>> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
>> * The default behaviour is thus backwards compatible, with a preference
>> for the header specific API
>>
>> We've used the custom SerDes without issues in a complex Connect and
>> Streams pipeline, but bumped into this API divergence of not preferring the
>> deserializer API with headers as the primary deserializer mechanism.
>>
>> The same API used elsewhere.
>>
>> * It's the default for the stock Java consumer:
>> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
>> (header enabled calls simplified API)
>> * Ditto Kafka Connect:
>> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
>> * And Kafka Streams:
>> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66
>>
>> Any thoughts on the proposed change with the additional headers argument
>> passed on deserialization?
>>
>> Best,
>> Lourens
>>
>

Re: Suggestion to let KafkaIO support the deserializer API with headers

Posted by Luke Cwik <lc...@google.com>.
Sounds good.

Note that you'll also want to update ReadFromKafkaDoFn[1] and provide tests
that cover both to make sure we don't regress and stop providing headers.

1:
https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309

On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude <lo...@shopify.com>
wrote:

> Hi everyone,
>
> We bumped into an API issue with the deserializer called on constructing
> KafaRecord instances in the KafkaIO module.
>
> I wanted to float this past the mailing list for discussion first before
> exploring further.
>
> The callsite referenced: KafkaIO only calls the deserializer with the
> simplified API that does not include Kafka record headers (even though they
> are available to pass as an argument):
> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203
>
> Our SerDes implementaton relies on Kafka Headers support and it was added
> to Kafka records via KIP as a means to include metadata cleanly and not
> abuse keys or values for such purposes.
>
> It is also a valid Deserializer API as per the official Kafka interface:
>
> *
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
> * It delegates to the simplified version as it's default implementation
> (which requires a formal implementation) in
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
> * The default behaviour is thus backwards compatible, with a preference
> for the header specific API
>
> We've used the custom SerDes without issues in a complex Connect and
> Streams pipeline, but bumped into this API divergence of not preferring the
> deserializer API with headers as the primary deserializer mechanism.
>
> The same API used elsewhere.
>
> * It's the default for the stock Java consumer:
> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
> (header enabled calls simplified API)
> * Ditto Kafka Connect:
> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
> * And Kafka Streams:
> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66
>
> Any thoughts on the proposed change with the additional headers argument
> passed on deserialization?
>
> Best,
> Lourens
>