You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fr...@apache.org on 2022/09/17 12:42:33 UTC

[druid] branch master updated: kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)

This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new da30c8070a kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)
da30c8070a is described below

commit da30c8070a073b3d694e4aefd7bd076198d27c3c
Author: Ellen Shen <el...@gmail.com>
AuthorDate: Sat Sep 17 05:42:21 2022 -0700

    kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)
    
    * allow kakfa custom serializer to be configured
    
      * add unit tests
    
    Co-authored-by: ellen shen <el...@apple.com>
---
 .../druid/indexing/kafka/KafkaRecordSupplier.java  | 14 +++--
 .../indexing/kafka/KafkaRecordSupplierTest.java    | 61 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index d64c0a64c5..f6d37ff6a9 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -48,6 +48,7 @@ import java.lang.reflect.Type;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -234,7 +235,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
     }
   }
 
-  private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
+  private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey, boolean isKey)
   {
     Deserializer deserializerObject;
     try {
@@ -257,6 +258,13 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
     catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
       throw new StreamException(e);
     }
+
+    Map<String, Object> configs = new HashMap<>();
+    for (String key : properties.stringPropertyNames()) {
+      configs.put(key, properties.get(key));
+    }
+
+    deserializerObject.configure(configs, isKey);
     return deserializerObject;
   }
 
@@ -272,8 +280,8 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader());
-      Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
-      Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");
+      Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer", true);
+      Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer", false);
 
       return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
     }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 31a3ac0904..897a586985 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -57,6 +57,7 @@ public class KafkaRecordSupplierTest
 {
 
   private static String topic = "topic";
+  private static String additonal_parameter = "additional.parameter";
   private static long poll_timeout_millis = 1000;
   private static int pollRetry = 5;
   private static int topicPosFix = 0;
@@ -156,6 +157,30 @@ public class KafkaRecordSupplierTest
     }
   }
 
+
+  public static class TestKafkaDeserializerRequiresParameter implements Deserializer<byte[]>
+  {
+    @Override
+    public void configure(Map<String, ?> map, boolean b)
+    {
+      if (!map.containsKey("additional.parameter")) {
+        throw new IllegalStateException("require additional.parameter configured");
+      }
+    }
+
+    @Override
+    public void close()
+    {
+
+    }
+
+    @Override
+    public byte[] deserialize(String topic, byte[] data)
+    {
+      return data;
+    }
+  }
+
   @BeforeClass
   public static void setupClass() throws Exception
   {
@@ -245,6 +270,42 @@ public class KafkaRecordSupplierTest
     recordSupplier.close();
   }
 
+
+  @Test
+  public void testSupplierSetupCustomDeserializerRequiresParameter()
+  {
+
+    Map<String, Object> properties = kafkaServer.consumerProperties();
+    properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+    properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+    properties.put(additonal_parameter, "stringValue");
+
+    KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+            properties,
+            OBJECT_MAPPER
+    );
+
+    Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
+    recordSupplier.close();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
+  {
+
+    Map<String, Object> properties = kafkaServer.consumerProperties();
+    properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+    properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+
+    KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+            properties,
+            OBJECT_MAPPER
+    );
+
+    Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
+    recordSupplier.close();
+  }
+
   @Test
   public void testPollCustomDeserializer() throws InterruptedException, ExecutionException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org