You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/09/09 10:02:00 UTC

[jira] [Work logged] (BEAM-10865) Support for Kafka deserialization API with headers (since Kafka API 2.1.0)

     [ https://issues.apache.org/jira/browse/BEAM-10865?focusedWorklogId=480710&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-480710 ]

ASF GitHub Bot logged work on BEAM-10865:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Sep/20 10:01
            Start Date: 09/Sep/20 10:01
    Worklog Time Spent: 10m 
      Work Description: methodmissing opened a new pull request #12794:
URL: https://github.com/apache/beam/pull/12794


   TLDR  Let KafkaIO support the deserializer API with headers
   
     References mailing list posts:
   
     * Original https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E
     * Reply from Luke https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E
   
   ### Design decisions
   
   The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
   
   ```
     required: String,byte[]
     found: String,Headers,byte[]
     reason: actual and formal argument lists differ in length
     where T is a type-variable:
   ```
   
   Because the headers default API only landed in 2.1.0 via https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced
   
   I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof.
   
   ### Not so great things
   
   Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
   
   ```
    	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
   	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
   	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
   	at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
   	at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
   	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
   	at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
   	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
   	at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
   	at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
   	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
   	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
   ```
   
   And effectively this penalizes the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
   
   ### Other avenues explored
   
   For runtime deserialization:
   
   * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
   
   For regression tests (that we don't stop passing headers in the future):
   
   * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
   
   * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
   
   * Evaluated `ExtendedSerializer`´s [wrapper](https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
   
   ### How the current regression test works
   
   I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
   
   Kafka client API 1.0.0 :
   
   ```
   Frame 0: java.lang.Thread.getStackTrace
   Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
   ```
   
   For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
   
   Kafka client API 2.1.0+ :
   ```
   Frame 0: java.lang.Thread.getStackTrace
   Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
   ```
   
   For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
   
   ### Feedback items for me
   
   * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
   * Ideas for a better way to test for regressions?
   
   ### Questions
   
   * Would it make sense to consider raising the minimum supported client API in order to 
   * If this implementation (and very likely iterations thereof :-)), would support for the same API on serialization be appreciated as well?
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 480710)
    Remaining Estimate: 23h 50m  (was: 24h)
            Time Spent: 10m

> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
>                 Key: BEAM-10865
>                 URL: https://issues.apache.org/jira/browse/BEAM-10865
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Lourens Naudé
>            Priority: P2
>              Labels: KafkaIO, kafka
>   Original Estimate: 24h
>          Time Spent: 10m
>  Remaining Estimate: 23h 50m
>
> References mailing list posts:
>  * Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
>  * Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
> ```
>  required: String,byte[]
>  found: String,Headers,byte[]
>  reason: actual and formal argument lists differ in length
>  where T is a type-variable:
>  ```
> Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
>  at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
>  at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
>  at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
>  at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
>  at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
>  at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
>  at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
>  at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
>  at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
>  at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
>  at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
>  at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
>  ```
> And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
>  * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
>  * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
>  * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
>  * Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
>  ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
>  
> Kafka client API 2.1.0+ :
>  ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
>  ```
> For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
> Feedback items and questions
>  * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
>  * Ideas for a better way to test for regressions?
>  * Would it make sense to consider raising the minimum supported client API in order to
>  * If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?
> Thanks for any consideration!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)