You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/07/30 23:25:42 UTC
[incubator-druid] branch master updated: removed hard-coded Kafka
key and value deserializer (#8112)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new aba65bb removed hard-coded Kafka key and value deserializer (#8112)
aba65bb is described below
commit aba65bb675e0c34216c8d1d0efbe93f17842c1f4
Author: Aaron Bossert <aa...@punchcyber.com>
AuthorDate: Tue Jul 30 19:25:32 2019 -0400
removed hard-coded Kafka key and value deserializer (#8112)
* removed hard-coded Kafka key and value deserializer, leaving default deserializer as org.apache.kafka.common.serialization.ByteArrayDeserializer. Also added checks to ensure that any provided deserializer class extends org.apache.kafka.serialization.Deserializer and outputs a byte array.
* Addressed all comments from original pull request and also added a
unit test.
* Added additional test that uses "poll" to ensure that custom deserializer
works properly.
---
.../druid/indexing/kafka/KafkaConsumerConfigs.java | 3 -
.../druid/indexing/kafka/KafkaIndexTask.java | 3 -
.../druid/indexing/kafka/KafkaRecordSupplier.java | 32 ++++++-
.../indexing/kafka/KafkaRecordSupplierTest.java | 98 +++++++++++++++++++++-
4 files changed, 125 insertions(+), 11 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index b5f7869..cdce155 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.HashMap;
import java.util.Map;
@@ -36,8 +35,6 @@ public class KafkaConsumerConfigs
{
final Map<String, Object> props = new HashMap<>();
props.put("metadata.max.age.ms", "10000");
- props.put("key.deserializer", ByteArrayDeserializer.class.getName());
- props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index fa900d2..b55d05b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -34,7 +34,6 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -154,8 +153,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
final Map<String, Object> props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties());
props.put("auto.offset.reset", "none");
- props.put("key.deserializer", ByteArrayDeserializer.class.getName());
- props.put("value.deserializer", ByteArrayDeserializer.class.getName());
return new KafkaRecordSupplier(props, configMapper);
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index e40f77a..0859665 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -33,8 +33,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
import javax.annotation.Nonnull;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -196,7 +200,28 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
}
}
}
-
+
+ private Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
+ {
+ Deserializer deserializerObject;
+ try {
+ Class deserializerClass = Class.forName(properties.getProperty(kafkaConfigKey, ByteArrayDeserializer.class.getTypeName()));
+ Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class);
+
+ Type deserializerReturnType = deserializerMethod.getGenericReturnType();
+
+ if (deserializerReturnType == byte[].class) {
+ deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance();
+ } else {
+ throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + deserializerClass.getName() + " returns " + deserializerReturnType.getTypeName());
+ }
+ }
+ catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new StreamException(e);
+ }
+ return deserializerObject;
+ }
+
private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
@@ -207,7 +232,10 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
+ Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");
+
+ return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index b505884..476a193 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -54,7 +55,7 @@ public class KafkaRecordSupplierTest
private static int pollRetry = 5;
private static int topicPosFix = 0;
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
-
+
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
@@ -125,7 +126,28 @@ public class KafkaRecordSupplierTest
);
}).collect(Collectors.toList());
}
-
+
+ public static class TestKafkaDeserializer implements Deserializer<byte[]>
+ {
+ @Override
+ public void configure(Map<String, ?> map, boolean b)
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public byte[] deserialize(String topic, byte[] data)
+ {
+ return data;
+ }
+ }
+
@BeforeClass
public static void setupClass() throws Exception
{
@@ -183,7 +205,77 @@ public class KafkaRecordSupplierTest
recordSupplier.close();
}
-
+
+ @Test
+ public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException
+ {
+
+ // Insert data
+ insertData();
+
+ Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
+ StreamPartition.of(topic, 0),
+ StreamPartition.of(topic, 1)
+ );
+
+ Map<String, Object> properties = kafkaServer.consumerProperties();
+ properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+ properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+
+ KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+ properties,
+ objectMapper
+ );
+
+ Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
+
+ recordSupplier.assign(partitions);
+
+ Assert.assertEquals(partitions, recordSupplier.getAssignment());
+ Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(topic));
+
+ recordSupplier.close();
+ }
+
+ @Test
+ public void testPollCustomDeserializer() throws InterruptedException, ExecutionException
+ {
+
+ // Insert data
+ insertData();
+
+ Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
+ StreamPartition.of(topic, 0),
+ StreamPartition.of(topic, 1)
+ );
+
+ Map<String, Object> properties = kafkaServer.consumerProperties();
+ properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+ properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+
+ KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+ properties,
+ objectMapper
+ );
+
+ recordSupplier.assign(partitions);
+ recordSupplier.seekToEarliest(partitions);
+
+ List<OrderedPartitionableRecord<Integer, Long>> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
+
+ List<OrderedPartitionableRecord<Integer, Long>> polledRecords = recordSupplier.poll(poll_timeout_millis);
+ for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
+ polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(partitions, recordSupplier.getAssignment());
+ Assert.assertEquals(initialRecords.size(), polledRecords.size());
+ Assert.assertTrue(initialRecords.containsAll(polledRecords));
+
+ recordSupplier.close();
+ }
+
@Test
public void testPoll() throws InterruptedException, ExecutionException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org