You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/07/30 23:25:42 UTC

[incubator-druid] branch master updated: removed hard-coded Kafka key and value deserializer (#8112)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aba65bb  removed hard-coded Kafka key and value deserializer (#8112)
aba65bb is described below

commit aba65bb675e0c34216c8d1d0efbe93f17842c1f4
Author: Aaron Bossert <aa...@punchcyber.com>
AuthorDate: Tue Jul 30 19:25:32 2019 -0400

    removed hard-coded Kafka key and value deserializer (#8112)
    
    * removed hard-coded Kafka key and value deserializer, leaving default deserializer as org.apache.kafka.common.serialization.ByteArrayDeserializer.  Also added checks to ensure that any provided deserializer class extends org.apache.kafka.serialization.Deserializer and outputs a byte array.
    
    * Addressed all comments from original pull request and also added a
    unit test.
    
    * Added additional test that uses "poll" to ensure that custom deserializer
    works properly.
---
 .../druid/indexing/kafka/KafkaConsumerConfigs.java |  3 -
 .../druid/indexing/kafka/KafkaIndexTask.java       |  3 -
 .../druid/indexing/kafka/KafkaRecordSupplier.java  | 32 ++++++-
 .../indexing/kafka/KafkaRecordSupplierTest.java    | 98 +++++++++++++++++++++-
 4 files changed, 125 insertions(+), 11 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index b5f7869..cdce155 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
 
 import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,8 +35,6 @@ public class KafkaConsumerConfigs
   {
     final Map<String, Object> props = new HashMap<>();
     props.put("metadata.max.age.ms", "10000");
-    props.put("key.deserializer", ByteArrayDeserializer.class.getName());
-    props.put("value.deserializer", ByteArrayDeserializer.class.getName());
     props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
     props.put("auto.offset.reset", "none");
     props.put("enable.auto.commit", "false");
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index fa900d2..b55d05b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -34,7 +34,6 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -154,8 +153,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
       final Map<String, Object> props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties());
 
       props.put("auto.offset.reset", "none");
-      props.put("key.deserializer", ByteArrayDeserializer.class.getName());
-      props.put("value.deserializer", ByteArrayDeserializer.class.getName());
 
       return new KafkaRecordSupplier(props, configMapper);
     }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index e40f77a..0859665 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -33,8 +33,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
 
 import javax.annotation.Nonnull;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -196,7 +200,28 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
       }
     }
   }
-
+  
+  private Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
+  {
+    Deserializer deserializerObject;
+    try {
+      Class deserializerClass = Class.forName(properties.getProperty(kafkaConfigKey, ByteArrayDeserializer.class.getTypeName()));
+      Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class);
+      
+      Type deserializerReturnType = deserializerMethod.getGenericReturnType();
+      
+      if (deserializerReturnType == byte[].class) {
+        deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance();
+      } else {
+        throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + deserializerClass.getName() + " returns " + deserializerReturnType.getTypeName());
+      }
+    }
+    catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+      throw new StreamException(e);
+    }
+    return deserializerObject;
+  }
+  
   private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
   {
     final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
@@ -207,7 +232,10 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-      return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+      Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
+      Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");
+  
+      return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
     }
     finally {
       Thread.currentThread().setContextClassLoader(currCtxCl);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index b505884..476a193 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.TestHelper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,7 +55,7 @@ public class KafkaRecordSupplierTest
   private static int pollRetry = 5;
   private static int topicPosFix = 0;
   private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
-
+  
   private static TestingCluster zkServer;
   private static TestBroker kafkaServer;
 
@@ -125,7 +126,28 @@ public class KafkaRecordSupplierTest
       );
     }).collect(Collectors.toList());
   }
-
+  
+  public static class TestKafkaDeserializer implements Deserializer<byte[]>
+  {
+    @Override
+    public void configure(Map<String, ?> map, boolean b)
+    {
+    
+    }
+  
+    @Override
+    public void close()
+    {
+    
+    }
+  
+    @Override
+    public byte[] deserialize(String topic, byte[] data)
+    {
+      return data;
+    }
+  }
+  
   @BeforeClass
   public static void setupClass() throws Exception
   {
@@ -183,7 +205,77 @@ public class KafkaRecordSupplierTest
 
     recordSupplier.close();
   }
-
+  
+  @Test
+  public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException
+  {
+    
+    // Insert data
+    insertData();
+    
+    Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
+        StreamPartition.of(topic, 0),
+        StreamPartition.of(topic, 1)
+    );
+    
+    Map<String, Object> properties = kafkaServer.consumerProperties();
+    properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+    properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+    
+    KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+        properties,
+        objectMapper
+    );
+    
+    Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
+    
+    recordSupplier.assign(partitions);
+    
+    Assert.assertEquals(partitions, recordSupplier.getAssignment());
+    Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(topic));
+    
+    recordSupplier.close();
+  }
+  
+  @Test
+  public void testPollCustomDeserializer() throws InterruptedException, ExecutionException
+  {
+    
+    // Insert data
+    insertData();
+    
+    Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
+        StreamPartition.of(topic, 0),
+        StreamPartition.of(topic, 1)
+    );
+  
+    Map<String, Object> properties = kafkaServer.consumerProperties();
+    properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+    properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
+  
+    KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
+        properties,
+        objectMapper
+    );
+    
+    recordSupplier.assign(partitions);
+    recordSupplier.seekToEarliest(partitions);
+    
+    List<OrderedPartitionableRecord<Integer, Long>> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
+    
+    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = recordSupplier.poll(poll_timeout_millis);
+    for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
+      polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+      Thread.sleep(200);
+    }
+    
+    Assert.assertEquals(partitions, recordSupplier.getAssignment());
+    Assert.assertEquals(initialRecords.size(), polledRecords.size());
+    Assert.assertTrue(initialRecords.containsAll(polledRecords));
+    
+    recordSupplier.close();
+  }
+  
   @Test
   public void testPoll() throws InterruptedException, ExecutionException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org