You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/05/15 18:00:02 UTC

[jira] [Updated] (BEAM-10865) Support for Kafka deserialization API with headers (since Kafka API 2.1.0)

     [ https://issues.apache.org/jira/browse/BEAM-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10865:
-----------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Resolved)

Hello! Due to a bug in our Jira configuration, this issue had status:Resolved but resolution:Unresolved.

I am bulk editing these issues to have resolution:Fixed

If a different resolution is appropriate, please change it. To do this, click the "Resolve" button (you can do this even for closed issues) and set the Resolution field to the right value.

> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
>                 Key: BEAM-10865
>                 URL: https://issues.apache.org/jira/browse/BEAM-10865
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Lourens Naudé
>            Assignee: Lourens Naudé
>            Priority: P2
>              Labels: KafkaIO, kafka
>   Original Estimate: 24h
>          Time Spent: 4h
>  Remaining Estimate: 20h
>
> References mailing list posts:
>  * Original [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
>  * Reply from Luke [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:
> ```
>  required: String,byte[]
>  found: String,Headers,byte[]
>  reason: actual and formal argument lists differ in length
>  where T is a type-variable:
>  ```
> Because the headers default API only landed in 2.1.0 via [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
>  at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
>  at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
>  at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
>  at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
>  at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
>  at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
>  at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
>  at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
>  at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
>  at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
>  at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
>  at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
>  ```
> And effectively this penalises the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
>  * Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
>  * I tried Mockito and Powermock implementations on both `LocalDeserializerProvider` and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.
>  * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for `ConsumerRecord` in the tests does not use the one that sets headers, presumably for client API compatibility too.
>  * Evaluated `ExtendedSerializer`´s [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]), but `ExtendedSerializer` is deprecated API and no point in bringing that in as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the whole test suite depends on the `Integer` (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the `deserialize` method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
>  ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it was called directly and not via a default or actual implementation on `Deserializer`. Frames 1 and 2 being equal is because of the `super.deserialize` call.
>  
> Kafka client API 2.1.0+ :
>  ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
>  ```
> For clients 2.1.0 and beyond, frame 3 is `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on `Deserializer`. In practice this may refer to an actual override implementation.
> Feedback items and questions
>  * Any alternatives for the SpEL evaluation for this hot path API? `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not called as often as twice per record.
>  * Ideas for a better way to test for regressions?
>  * Would it make sense to consider raising the minimum supported client API in order to
>  * If this implementation (and very likely iterations thereof :)), would support for the same API on serialization be appreciated as well?
> Thanks for any consideration!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)