You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/10 16:33:24 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #12794: [BEAM-10865] Support for Kafka deserialization API with headers (since Kafka API 2.1.0)

lukecwik commented on a change in pull request #12794:
URL: https://github.com/apache/beam/pull/12794#discussion_r486477159



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
##########
@@ -90,6 +98,16 @@ public ConsumerSpEL() {
     } catch (NoSuchMethodException | SecurityException e) {
       LOG.debug("OffsetsForTimes is not available.");
     }
+
+    try {
+      // It is supported by Kafka Client 2.1.0 onwards.
+      Method method =
+          Deserializer.class.getDeclaredMethod(
+              "deserialize", String.class, Headers.class, byte[].class);
+      deserializerSupportsHeaders = method.isDefault();

Review comment:
       Why does this matter that it is a default method?
   Shouldn't we be setting this to true as long as the method exists?
   
   

##########
File path: sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
##########
@@ -501,6 +503,83 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
     p.run();
   }
 
+  public static class IntegerDeserializerWithHeadersAssertor extends IntegerDeserializer
+      implements Deserializer<Integer> {
+    ConsumerSpEL consumerSpEL = null;
+
+    @Override
+    public Integer deserialize(String topic, byte[] data) {
+      StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();

Review comment:
       In the build.gradle you'll want to setup a junit test run that uses Kafka 2.1.0 and executes these tests.
   
   This is a pretty good example of how to get this going:
   https://docs.gradle.org/current/userguide/java_testing.html#sec:configuring_java_integration_tests
   
   You shouldn't need any additional source sets but you'll want to add a configuration for kafka210 that adds the dependency and also the test task that uses it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org