You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Lourens Naudé (Jira)" <ji...@apache.org> on 2020/09/09 09:54:00 UTC
[jira] [Updated] (BEAM-10865) Support for Kafka deserialization API
with headers (since Kafka API 2.1.0)
[ https://issues.apache.org/jira/browse/BEAM-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lourens Naudé updated BEAM-10865:
---------------------------------
Description:
References mailing list posts:
* Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
* Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
*Design decisions*
The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
```
required: String,byte[]
found: String,Headers,byte[]
reason: actual and formal argument lists differ in length
where T is a type-variable:
```
Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof
*Not so great things*
Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
```
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
```
And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
*Other avenues explored*
For runtime deserialization:
* Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
For regression tests (that we don't stop passing headers in the future):
* I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
* Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
* Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
*How the current regression test works*
I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
Kafka client API 1.0.0 :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
```
For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
Kafka client API 2.1.0+ :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
```
For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
Feedback items and questions
* Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
* Ideas for a better way to test for regressions?
* Would it make sense to consider raising the minimum supported client API in order to
* If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?
Thanks for any consideration!
was:
References mailing list posts:
* Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
* Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
*Design decisions*
The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
```
required: String,byte[]
found: String,Headers,byte[]
reason: actual and formal argument lists differ in length
where T is a type-variable:
```
Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof.
#
##
### Not so great things
Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
```
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
```
And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
*Other avenues explored*
For runtime deserialization:
* Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
For regression tests (that we don't stop passing headers in the future):
* I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
* Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
* Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
*How the current regression test works*
I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
Kafka client API 1.0.0 :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
```
For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
Kafka client API 2.1.0+ :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
```
For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
Feedback items and questions
* Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
* Ideas for a better way to test for regressions?
* Would it make sense to consider raising the minimum supported client API in order to
* If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?
Thanks for any consideration!
> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
> Key: BEAM-10865
> URL: https://issues.apache.org/jira/browse/BEAM-10865
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Lourens Naudé
> Priority: P2
> Labels: KafkaIO, kafka
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> References mailing list posts:
> * Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
> * Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
> ```
> required: String,byte[]
> found: String,Headers,byte[]
> reason: actual and formal argument lists differ in length
> where T is a type-variable:
> ```
> Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
> at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
> at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
> at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
> at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
> at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
> at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
> at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
> ```
> And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
> * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
> * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
> * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
> * Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
> Frame 0: java.lang.Thread.getStackTrace
> Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
> ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
>
> Kafka client API 2.1.0+ :
> ```
> Frame 0: java.lang.Thread.getStackTrace
> Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
> ```
> For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
> Feedback items and questions
> * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
> * Ideas for a better way to test for regressions?
> * Would it make sense to consider raising the minimum supported client API in order to
> * If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?
> Thanks for any consideration!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)