You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Lourens Naudé (Jira)" <ji...@apache.org> on 2020/09/09 09:54:00 UTC

[jira] [Updated] (BEAM-10865) Support for Kafka deserialization API with headers (since Kafka API 2.1.0)

     [ https://issues.apache.org/jira/browse/BEAM-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lourens Naudé updated BEAM-10865:
---------------------------------
    Description: 
References mailing list posts:
 * Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
 * Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]

*Design decisions*

The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:

```
 required: String,byte[]
 found: String,Headers,byte[]
 reason: actual and formal argument lists differ in length
 where T is a type-variable:
 ```

Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]

I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof

*Not so great things*

Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):

```
 at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
 at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
 at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
 at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
 at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
 at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
 at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
 at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
 at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
 at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
 at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
 at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
 ```

And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.

*Other avenues explored*

For runtime deserialization:
 * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime

For regression tests (that we don't stop passing headers in the future):
 * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.

 * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.

 * Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency

*How the current regression test works*

I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.

Kafka client API 1.0.0 :

```
 Frame 0: java.lang.Thread.getStackTrace
 Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
 ```

For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.

 

Kafka client API 2.1.0+ :
 ```
 Frame 0: java.lang.Thread.getStackTrace
 Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
 ```

For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.

Feedback items and questions
 * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
 * Ideas for a better way to test for regressions?

 * Would it make sense to consider raising the minimum supported client API in order to
 * If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?

Thanks for any consideration!

  was:
References mailing list posts:
 * Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
 * Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]



*Design decisions*

The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:

```
 required: String,byte[]
 found: String,Headers,byte[]
 reason: actual and formal argument lists differ in length
 where T is a type-variable:
 ```

Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]

I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof.
 # 
 ## 
 ### Not so great things

Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):

```
 at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
 at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
 at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
 at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
 at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
 at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
 at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
 at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
 at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
 at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
 at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
 at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
 ```

And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.

*Other avenues explored*

For runtime deserialization:
 * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime

For regression tests (that we don't stop passing headers in the future):
 * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.

 * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.

 * Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency



*How the current regression test works*

I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.

Kafka client API 1.0.0 :

```
 Frame 0: java.lang.Thread.getStackTrace
 Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
 ```

For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.

 

Kafka client API 2.1.0+ :
 ```
 Frame 0: java.lang.Thread.getStackTrace
 Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
 Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
 ```

For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.

Feedback items and questions
 * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
 * Ideas for a better way to test for regressions?

 * Would it make sense to consider raising the minimum supported client API in order to
 * If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?

Thanks for any consideration!


> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
>                 Key: BEAM-10865
>                 URL: https://issues.apache.org/jira/browse/BEAM-10865
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Lourens Naudé
>            Priority: P2
>              Labels: KafkaIO, kafka
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> References mailing list posts:
>  * Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
>  * Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
> ```
>  required: String,byte[]
>  found: String,Headers,byte[]
>  reason: actual and formal argument lists differ in length
>  where T is a type-variable:
>  ```
> Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
>  at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
>  at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
>  at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
>  at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
>  at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
>  at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
>  at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
>  at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
>  at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
>  at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
>  at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
>  at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
>  ```
> And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
>  * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
>  * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
>  * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
>  * Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
>  ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
>  
> Kafka client API 2.1.0+ :
>  ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
>  ```
> For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
> Feedback items and questions
>  * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
>  * Ideas for a better way to test for regressions?
>  * Would it make sense to consider raising the minimum supported client API in order to
>  * If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?
> Thanks for any consideration!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)