You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fr...@apache.org on 2022/09/17 12:42:33 UTC
[druid] branch master updated: kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)
This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new da30c8070a kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)
da30c8070a is described below
commit da30c8070a073b3d694e4aefd7bd076198d27c3c
Author: Ellen Shen <el...@gmail.com>
AuthorDate: Sat Sep 17 05:42:21 2022 -0700
kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)
* allow kakfa custom serializer to be configured
* add unit tests
Co-authored-by: ellen shen <el...@apple.com>
---
.../druid/indexing/kafka/KafkaRecordSupplier.java | 14 +++--
.../indexing/kafka/KafkaRecordSupplierTest.java | 61 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 3 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index d64c0a64c5..f6d37ff6a9 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -48,6 +48,7 @@ import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -234,7 +235,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
}
}
- private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
+ private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey, boolean isKey)
{
Deserializer deserializerObject;
try {
@@ -257,6 +258,13 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new StreamException(e);
}
+
+ Map<String, Object> configs = new HashMap<>();
+ for (String key : properties.stringPropertyNames()) {
+ configs.put(key, properties.get(key));
+ }
+
+ deserializerObject.configure(configs, isKey);
return deserializerObject;
}
@@ -272,8 +280,8 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader());
- Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
- Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");
+ Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer", true);
+ Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer", false);
return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 31a3ac0904..897a586985 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -57,6 +57,7 @@ public class KafkaRecordSupplierTest
{
private static String topic = "topic";
+ private static String additonal_parameter = "additional.parameter";
private static long poll_timeout_millis = 1000;
private static int pollRetry = 5;
private static int topicPosFix = 0;
@@ -156,6 +157,30 @@ public class KafkaRecordSupplierTest
}
}
+
+ public static class TestKafkaDeserializerRequiresParameter implements Deserializer<byte[]>
+ {
+ @Override
+ public void configure(Map<String, ?> map, boolean b)
+ {
+ if (!map.containsKey("additional.parameter")) {
+ throw new IllegalStateException("require additional.parameter configured");
+ }
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public byte[] deserialize(String topic, byte[] data)
+ {
+ return data;
+ }
+ }
+
@BeforeClass
public static void setupClass() throws Exception
{
@@ -245,6 +270,42 @@ public class KafkaRecordSupplierTest
recordSupplier.close();
}
+
+ @Test
+ public void testSupplierSetupCustomDeserializerRequiresParameter()
+ {
+
+ Map<String, Object> properties = kafkaServer.consumerProperties();
+ properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+ properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+ properties.put(additonal_parameter, "stringValue");
+
+ KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+ properties,
+ OBJECT_MAPPER
+ );
+
+ Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
+ recordSupplier.close();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
+ {
+
+ Map<String, Object> properties = kafkaServer.consumerProperties();
+ properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+ properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
+
+ KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+ properties,
+ OBJECT_MAPPER
+ );
+
+ Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
+ recordSupplier.close();
+ }
+
@Test
public void testPollCustomDeserializer() throws InterruptedException, ExecutionException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org