You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/08/14 20:46:37 UTC

[incubator-druid] branch master updated: Upgrade Kafka library for kafka-lookup module (#8078)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f3a996  Upgrade Kafka library for kafka-lookup module (#8078)
1f3a996 is described below

commit 1f3a99616d06a0131f4ae1b66ef37442f1b7c796
Author: Sayat <sa...@gmail.com>
AuthorDate: Wed Aug 14 22:46:25 2019 +0200

    Upgrade Kafka library for kafka-lookup module (#8078)
    
    * Upgrade Kafka library for kafka-lookup module
    
    * Update licenes.yaml
    
    * Adopt class workaround from KafkaRecordSupplier#getKafkaConsumer
    
    * Update lisences for kafka clients
---
 extensions-core/kafka-extraction-namespace/pom.xml |  22 +-
 .../query/lookup/KafkaLookupExtractorFactory.java  | 190 ++++++------
 .../lookup/KafkaLookupExtractorFactoryTest.java    | 118 +++-----
 .../query/lookup/TestKafkaExtractionCluster.java   | 320 +++++++--------------
 licenses.yaml                                      |  21 +-
 5 files changed, 250 insertions(+), 421 deletions(-)

diff --git a/extensions-core/kafka-extraction-namespace/pom.xml b/extensions-core/kafka-extraction-namespace/pom.xml
index fb1f27f..2867d12 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -33,6 +33,10 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
+  <properties>
+    <apache.kafka.version>2.1.0</apache.kafka.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.druid</groupId>
@@ -59,18 +63,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
-      <version>0.8.2.1</version>
+      <artifactId>kafka-clients</artifactId>
+      <version>${apache.kafka.version}</version>
       <exclusions>
         <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-        <exclusion>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-api</artifactId>
         </exclusion>
@@ -99,6 +95,12 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.12</artifactId>
+      <version>${apache.kafka.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>mx4j</groupId>
       <artifactId>mx4j-tools</artifactId>
       <version>3.0.1</version>
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index 01b3dd2..bd5ccf7 100644
--- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -30,26 +30,25 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.consumer.ZookeeperConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.Decoder;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.extraction.MapLookupExtractor;
 import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
-
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.validation.constraints.Min;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -62,21 +61,11 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
 
 @JsonTypeName("kafka")
 public class KafkaLookupExtractorFactory implements LookupExtractorFactory
 {
   private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
-  static final Decoder<String> DEFAULT_STRING_DECODER = new Decoder<String>()
-  {
-    @Override
-    public String fromBytes(byte[] bytes)
-    {
-      return StringUtils.fromUtf8(bytes);
-    }
-  };
-
   private final ListeningExecutorService executorService;
   private final AtomicLong doubleEventCount = new AtomicLong(0L);
   private final NamespaceExtractionCacheManager cacheManager;
@@ -85,7 +74,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
   private final AtomicBoolean started = new AtomicBoolean(false);
   private CacheHandler cacheHandler;
 
-  private volatile ConsumerConnector consumerConnector;
   private volatile ListenableFuture<?> future = null;
 
   @JsonProperty
@@ -156,92 +144,57 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
     synchronized (started) {
       if (started.get()) {
         LOG.warn("Already started, not starting again");
-        return started.get();
+        return true;
       }
       if (executorService.isShutdown()) {
         LOG.warn("Already shut down, not starting again");
         return false;
       }
-      final Properties kafkaProperties = new Properties();
-      kafkaProperties.putAll(getKafkaProperties());
-      if (kafkaProperties.containsKey("group.id")) {
-        throw new IAE(
-            "Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
-            kafkaProperties.getProperty("group.id")
-        );
-      }
-      if (kafkaProperties.containsKey("auto.offset.reset")) {
-        throw new IAE(
-            "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
-            kafkaProperties.getProperty("auto.offset.reset")
-        );
-      }
-      Preconditions.checkNotNull(
-          kafkaProperties.getProperty("zookeeper.connect"),
-          "zookeeper.connect required property"
-      );
+      verifyKafkaProperties();
 
-      kafkaProperties.setProperty("group.id", factoryId);
       final String topic = getKafkaTopic();
       LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
       cacheHandler = cacheManager.createCache();
       final ConcurrentMap<String, String> map = cacheHandler.getCache();
       mapRef.set(map);
-      // Enable publish-subscribe
-      kafkaProperties.setProperty("auto.offset.reset", "smallest");
+
 
       final CountDownLatch startingReads = new CountDownLatch(1);
 
-      final ListenableFuture<?> future = executorService.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (!executorService.isShutdown()) {
-                consumerConnector = buildConnector(kafkaProperties);
-                try {
-                  if (executorService.isShutdown()) {
-                    break;
-                  }
-
-                  final List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
-                      new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER
-                  );
-
-                  if (streams == null || streams.isEmpty()) {
-                    throw new IAE("Topic [%s] had no streams", topic);
-                  }
-                  if (streams.size() > 1) {
-                    throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
-                  }
-                  final KafkaStream<String, String> kafkaStream = streams.get(0);
-
-                  startingReads.countDown();
-
-                  for (final MessageAndMetadata<String, String> messageAndMetadata : kafkaStream) {
-                    final String key = messageAndMetadata.key();
-                    final String message = messageAndMetadata.message();
-                    if (key == null || message == null) {
-                      LOG.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
-                      continue;
-                    }
-                    doubleEventCount.incrementAndGet();
-                    map.put(key, message);
-                    doubleEventCount.incrementAndGet();
-                    LOG.trace("Placed key[%s] val[%s]", key, message);
-                  }
-                }
-                catch (Exception e) {
-                  LOG.error(e, "Error reading stream for topic [%s]", topic);
-                }
-                finally {
-                  consumerConnector.shutdown();
+      final ListenableFuture<?> future = executorService.submit(() -> {
+        final Consumer<String, String> consumer = getConsumer();
+        consumer.subscribe(Collections.singletonList(topic));
+        try {
+          while (!executorService.isShutdown()) {
+            try {
+              if (executorService.isShutdown()) {
+                break;
+              }
+              final ConsumerRecords<String, String> records = consumer.poll(1000);
+              startingReads.countDown();
+
+              for (final ConsumerRecord<String, String> record : records) {
+                final String key = record.key();
+                final String message = record.value();
+                if (key == null || message == null) {
+                  LOG.error("Bad key/message from topic [%s]: [%s]", topic, record);
+                  continue;
                 }
+                doubleEventCount.incrementAndGet();
+                map.put(key, message);
+                doubleEventCount.incrementAndGet();
+                LOG.trace("Placed key[%s] val[%s]", key, message);
               }
             }
+            catch (Exception e) {
+              LOG.error(e, "Error reading stream for topic [%s]", topic);
+            }
           }
-      );
+        }
+        finally {
+          consumer.close();
+        }
+      });
       Futures.addCallback(
           future,
           new FutureCallback<Object>()
@@ -293,14 +246,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
     }
   }
 
-  // Overridden in tests
-  ConsumerConnector buildConnector(Properties properties)
-  {
-    return new ZookeeperConsumerConnector(
-        new ConsumerConfig(properties)
-    );
-  }
-
   @Override
   public boolean close()
   {
@@ -312,10 +257,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
       started.set(false);
       executorService.shutdown();
 
-      if (consumerConnector != null) {
-        consumerConnector.shutdown();
-      }
-
       final ListenableFuture<?> future = this.future;
       if (future != null) {
         if (!future.isDone() && !future.cancel(false)) {
@@ -413,4 +354,51 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
   {
     return future;
   }
+
+  private void verifyKafkaProperties()
+  {
+    if (kafkaProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+      throw new IAE(
+              "Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
+              kafkaProperties.get(ConsumerConfig.GROUP_ID_CONFIG)
+      );
+    }
+    if (kafkaProperties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
+      throw new IAE(
+              "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
+              kafkaProperties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+      );
+    }
+    Preconditions.checkNotNull(
+            kafkaProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+            "bootstrap.servers required property"
+    );
+  }
+
+  // Overridden in tests
+  Consumer<String, String> getConsumer()
+  {
+    // Workaround for Kafka String Serializer could not be found
+    // Adopted from org.apache.druid.indexing.kafka.KafkaRecordSupplier#getKafkaConsumer
+    ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+    final Properties properties = getConsumerProperties();
+    try {
+      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+      return new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(currCtxCl);
+    }
+  }
+
+  @Nonnull
+  private Properties getConsumerProperties()
+  {
+    final Properties properties = new Properties();
+    properties.putAll(kafkaProperties);
+    // Enable publish-subscribe
+    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, factoryId);
+    return properties;
+  }
 }
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index f50d65c..50fcca0 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -23,17 +23,14 @@ import com.fasterxml.jackson.databind.BeanProperty;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Bytes;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.TopicFilter;
-import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Assert;
@@ -51,10 +48,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 @RunWith(PowerMockRunner.class)
@@ -278,17 +273,7 @@ public class KafkaLookupExtractorFactoryTest
   @Test
   public void testStartStop()
   {
-    final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
-    final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
-    final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
-    EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
-        EasyMock.anyObject(TopicFilter.class),
-        EasyMock.anyInt(),
-        EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
-        EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
-    )).andReturn(ImmutableList.of(kafkaStream)).once();
-    EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
-    EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
+    Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     EasyMock.expect(cacheManager.createCache())
             .andReturn(cacheHandler)
             .once();
@@ -296,34 +281,26 @@ public class KafkaLookupExtractorFactoryTest
     cacheHandler.close();
     EasyMock.expectLastCall();
 
-    final AtomicBoolean threadWasInterrupted = new AtomicBoolean(false);
-    consumerConnector.shutdown();
-    EasyMock.expectLastCall().andAnswer(() -> {
-      threadWasInterrupted.set(Thread.currentThread().isInterrupted());
-      return null;
-    }).times(2);
+    PowerMock.replay(cacheManager, cacheHandler);
 
-    PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
         TOPIC,
-        ImmutableMap.of("zookeeper.connect", "localhost"),
+        ImmutableMap.of("bootstrap.servers", "localhost"),
         10_000L,
         false
     )
     {
       @Override
-      ConsumerConnector buildConnector(Properties properties)
+      Consumer<String, String> getConsumer()
       {
-        return consumerConnector;
+        return kafkaConsumer;
       }
     };
 
     Assert.assertTrue(factory.start());
     Assert.assertTrue(factory.close());
     Assert.assertTrue(factory.getFuture().isDone());
-    Assert.assertFalse(threadWasInterrupted.get());
-
     PowerMock.verify(cacheManager, cacheHandler);
   }
 
@@ -334,20 +311,20 @@ public class KafkaLookupExtractorFactoryTest
     EasyMock.expect(cacheManager.createCache())
             .andReturn(cacheHandler)
             .once();
-    EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
+    EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
     cacheHandler.close();
     EasyMock.expectLastCall();
     PowerMock.replay(cacheManager, cacheHandler);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
         TOPIC,
-        ImmutableMap.of("zookeeper.connect", "localhost"),
+        ImmutableMap.of("bootstrap.servers", "localhost"),
         1,
         false
     )
     {
       @Override
-      ConsumerConnector buildConnector(Properties properties)
+      Consumer getConsumer()
       {
         // Lock up
         try {
@@ -368,36 +345,24 @@ public class KafkaLookupExtractorFactoryTest
   @Test
   public void testStartStopStart()
   {
-    final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
-    final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
-    final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
-    EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
-        EasyMock.anyObject(TopicFilter.class),
-        EasyMock.anyInt(),
-        EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
-        EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
-    )).andReturn(ImmutableList.of(kafkaStream)).once();
-    EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
-    EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
+    Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     EasyMock.expect(cacheManager.createCache())
             .andReturn(cacheHandler)
             .once();
     EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
     cacheHandler.close();
     EasyMock.expectLastCall().once();
-    consumerConnector.shutdown();
-    EasyMock.expectLastCall().times(2);
-    PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
+    PowerMock.replay(cacheManager, cacheHandler);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
         TOPIC,
-        ImmutableMap.of("zookeeper.connect", "localhost")
+        ImmutableMap.of("bootstrap.servers", "localhost")
     )
     {
       @Override
-      ConsumerConnector buildConnector(Properties properties)
+      Consumer<String, String> getConsumer()
       {
-        return consumerConnector;
+        return kafkaConsumer;
       }
     };
     Assert.assertTrue(factory.start());
@@ -407,40 +372,28 @@ public class KafkaLookupExtractorFactoryTest
   }
 
   @Test
-  public void testStartStartStop()
+  public void testStartStartStopStop()
   {
-    final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
-    final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
-    final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
-    EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
-        EasyMock.anyObject(TopicFilter.class),
-        EasyMock.anyInt(),
-        EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
-        EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
-    )).andReturn(ImmutableList.of(kafkaStream)).once();
-    EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
-    EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
+    Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     EasyMock.expect(cacheManager.createCache())
             .andReturn(cacheHandler)
             .once();
     EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
     cacheHandler.close();
     EasyMock.expectLastCall().once();
-    consumerConnector.shutdown();
-    EasyMock.expectLastCall().times(3);
-    PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
+    PowerMock.replay(cacheManager, cacheHandler);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
         TOPIC,
-        ImmutableMap.of("zookeeper.connect", "localhost"),
+        ImmutableMap.of("bootstrap.servers", "localhost"),
         10_000L,
         false
     )
     {
       @Override
-      ConsumerConnector buildConnector(Properties properties)
+      Consumer<String, String> getConsumer()
       {
-        return consumerConnector;
+        return kafkaConsumer;
       }
     };
     Assert.assertTrue(factory.start());
@@ -453,7 +406,12 @@ public class KafkaLookupExtractorFactoryTest
   @Test
   public void testStartFailsOnMissingConnect()
   {
-    expectedException.expectMessage("zookeeper.connect required property");
+    expectedException.expectMessage("bootstrap.servers required property");
+    EasyMock.expect(cacheManager.createCache())
+            .andReturn(cacheHandler)
+            .once();
+    EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
+    cacheHandler.close();
     PowerMock.replay(cacheManager);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
@@ -470,7 +428,12 @@ public class KafkaLookupExtractorFactoryTest
   {
     expectedException.expectMessage(
         "Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
-    PowerMock.replay(cacheManager);
+    EasyMock.expect(cacheManager.createCache())
+            .andReturn(cacheHandler)
+            .once();
+    EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>());
+    cacheHandler.close();
+    PowerMock.replay(cacheManager, cacheHandler);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
         TOPIC,
@@ -486,6 +449,12 @@ public class KafkaLookupExtractorFactoryTest
   {
     expectedException.expectMessage(
         "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
+    EasyMock.expect(cacheManager.createCache())
+            .andReturn(cacheHandler)
+            .once();
+    EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
+    cacheHandler.close();
+    EasyMock.expectLastCall();
     PowerMock.replay(cacheManager);
     final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
         cacheManager,
@@ -533,13 +502,6 @@ public class KafkaLookupExtractorFactoryTest
     Assert.assertEquals(injective, otherFactory.isInjective());
   }
 
-  @Test
-  public void testDefaultDecoder()
-  {
-    final String str = "some string";
-    Assert.assertEquals(str, KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER.fromBytes(StringUtils.toUtf8(str)));
-  }
-
   private IAnswer<Boolean> getBlockingAnswer()
   {
     return () -> {
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
index a830ba5..ac2ad60 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -25,34 +25,31 @@ import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
-import kafka.admin.AdminUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.TestingCluster;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.initialization.Initialization;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
-import org.apache.zookeeper.CreateMode;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import scala.Some;
+import scala.collection.immutable.List$;
 
-import java.io.Closeable;
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -60,7 +57,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -76,166 +72,37 @@ public class TestKafkaExtractionCluster
 
   private final Closer closer = Closer.create();
 
+  private TestingCluster zkServer;
   private KafkaServer kafkaServer;
-  private KafkaConfig kafkaConfig;
-  private TestingServer zkTestServer;
-  private ZkClient zkClient;
   private Injector injector;
   private ObjectMapper mapper;
   private KafkaLookupExtractorFactory factory;
 
+  private static List<ProducerRecord<byte[], byte[]>> generateRecords()
+  {
+    return ImmutableList.of(
+            new ProducerRecord<>(topicName, 0,
+                    StringUtils.toUtf8("abcdefg"),
+                    StringUtils.toUtf8("abcdefg")));
+  }
+
   @Before
   public void setUp() throws Exception
   {
-    zkTestServer = new TestingServer(-1, temporaryFolder.newFolder(), true);
-    zkTestServer.start();
-
-    closer.register(new Closeable()
-    {
-      @Override
-      public void close() throws IOException
-      {
-        zkTestServer.stop();
-      }
-    });
-
-    zkClient = new ZkClient(
-        zkTestServer.getConnectString(),
-        10000,
-        10000,
-        ZKStringSerializer$.MODULE$
-    );
-    closer.register(new Closeable()
-    {
-      @Override
-      public void close()
-      {
-        zkClient.close();
-      }
-    });
-    if (!zkClient.exists("/kafka")) {
-      zkClient.create("/kafka", null, CreateMode.PERSISTENT);
-    }
-
-    log.info("---------------------------Started ZK---------------------------");
-
-    final String zkKafkaPath = "/kafka";
-
-    final Properties serverProperties = new Properties();
-    serverProperties.putAll(kafkaProperties);
-    serverProperties.put("broker.id", "0");
-    serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
-    serverProperties.put("log.cleaner.enable", "true");
-    serverProperties.put("host.name", "127.0.0.1");
-    serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
-    serverProperties.put("zookeeper.session.timeout.ms", "10000");
-    serverProperties.put("zookeeper.sync.time.ms", "200");
-    serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
+    zkServer = new TestingCluster(1);
+    zkServer.start();
 
-    kafkaConfig = new KafkaConfig(serverProperties);
-
-    final long time = DateTimes.of("2015-01-01").getMillis();
     kafkaServer = new KafkaServer(
-        kafkaConfig,
-        new Time()
-        {
-
-          @Override
-          public long milliseconds()
-          {
-            return time;
-          }
-
-          @Override
-          public long nanoseconds()
-          {
-            return TimeUnit.MILLISECONDS.toNanos(milliseconds());
-          }
-
-          @Override
-          public void sleep(long ms)
-          {
-            try {
-              Thread.sleep(ms);
-            }
-            catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        }
-    );
-    kafkaServer.startup();
-    closer.register(new Closeable()
-    {
-      @Override
-      public void close()
-      {
-        kafkaServer.shutdown();
-        kafkaServer.awaitShutdown();
-      }
-    });
-
-    int sleepCount = 0;
-
-    while (!kafkaServer.kafkaController().isActive()) {
-      Thread.sleep(100);
-      if (++sleepCount > 10) {
-        throw new InterruptedException("Controller took to long to awaken");
-      }
-    }
-
-    log.info("---------------------------Started Kafka Server---------------------------");
-
-    final ZkClient zkClient = new ZkClient(
-        zkTestServer.getConnectString() + zkKafkaPath, 10000, 10000,
-        ZKStringSerializer$.MODULE$
-    );
-
-    try (final AutoCloseable autoCloseable = new AutoCloseable()
-    {
-      @Override
-      public void close()
-      {
-        if (zkClient.exists(zkKafkaPath)) {
-          try {
-            zkClient.deleteRecursive(zkKafkaPath);
-          }
-          catch (ZkException ex) {
-            log.warn(ex, "error deleting %s zk node", zkKafkaPath);
-          }
-        }
-        zkClient.close();
-      }
-    }) {
-      final Properties topicProperties = new Properties();
-      topicProperties.put("cleanup.policy", "compact");
-      if (!AdminUtils.topicExists(zkClient, topicName)) {
-        AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
-      }
-
-      log.info("---------------------------Created topic---------------------------");
+          getBrokerProperties(),
+          Time.SYSTEM,
+          Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
+          List$.MODULE$.empty());
 
-      Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
-    }
+    kafkaServer.startup();
+    log.info("---------------------------Started Kafka Broker ---------------------------");
 
-    final Properties kafkaProducerProperties = makeProducerProperties();
-    final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
-    try (final AutoCloseable autoCloseable = new AutoCloseable()
-    {
-      @Override
-      public void close()
-      {
-        producer.close();
-      }
-    }) {
-      producer.send(
-          new KeyedMessage<>(
-              topicName,
-              StringUtils.toUtf8("abcdefg"),
-              StringUtils.toUtf8("abcdefg")
-          )
-      );
-    }
+    log.info("---------------------------Publish Messages to topic-----------------------");
+    publishRecordsToKafka();
 
     System.setProperty("druid.extensions.searchCurrentClassloader", "false");
 
@@ -260,10 +127,7 @@ public class TestKafkaExtractionCluster
     mapper = injector.getInstance(ObjectMapper.class);
 
     log.info("--------------------------- placed default item via producer ---------------------------");
-    final Map<String, String> consumerProperties = new HashMap<>(kafkaProperties);
-    consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
-    consumerProperties.put("zookeeper.session.timeout.ms", "10000");
-    consumerProperties.put("zookeeper.sync.time.ms", "200");
+    final Map<String, String> consumerProperties = getConsumerProperties();
 
     final KafkaLookupExtractorFactory kafkaLookupExtractorFactory = new KafkaLookupExtractorFactory(
         null,
@@ -278,17 +142,48 @@ public class TestKafkaExtractionCluster
     Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaTopic(), factory.getKafkaTopic());
     Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaProperties(), factory.getKafkaProperties());
     factory.start();
-    closer.register(new Closeable()
-    {
-      @Override
-      public void close()
-      {
-        factory.close();
-      }
-    });
+    closer.register(() -> factory.close());
     log.info("--------------------------- started rename manager ---------------------------");
   }
 
+  @Nonnull
+  private Map<String, String> getConsumerProperties()
+  {
+    final Map<String, String> props = new HashMap<>(kafkaProperties);
+    int port = kafkaServer.socketServer().config().port();
+    props.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
+    return props;
+  }
+
+  private void publishRecordsToKafka()
+  {
+    final Properties kafkaProducerProperties = makeProducerProperties();
+
+    try (final Producer<byte[], byte[]> producer = new KafkaProducer(kafkaProducerProperties)) {
+      generateRecords().forEach(producer::send);
+    }
+  }
+
+  @Nonnull
+  private KafkaConfig getBrokerProperties() throws IOException
+  {
+    final Properties serverProperties = new Properties();
+    serverProperties.putAll(kafkaProperties);
+    serverProperties.put("broker.id", "0");
+    serverProperties.put("zookeeper.connect", zkServer.getConnectString());
+    serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
+    serverProperties.put("auto.create.topics.enable", "true");
+    serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
+    serverProperties.put("num.partitions", "1");
+    serverProperties.put("offsets.topic.replication.factor", "1");
+    serverProperties.put("default.replication.factor", "1");
+    serverProperties.put("log.cleaner.enable", "true");
+    serverProperties.put("advertised.host.name", "localhost");
+    serverProperties.put("zookeeper.session.timeout.ms", "30000");
+    serverProperties.put("zookeeper.sync.time.ms", "200");
+    return new KafkaConfig(serverProperties);
+  }
+
   @After
   public void tearDown() throws Exception
   {
@@ -299,10 +194,11 @@ public class TestKafkaExtractionCluster
   {
     final Properties kafkaProducerProperties = new Properties();
     kafkaProducerProperties.putAll(kafkaProperties);
-    kafkaProducerProperties.put(
-        "metadata.broker.list",
-        StringUtils.format("127.0.0.1:%d", kafkaServer.socketServer().port())
-    );
+    int port = kafkaServer.socketServer().config().port();
+    kafkaProducerProperties.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
+    kafkaProducerProperties.put("key.serializer", ByteArraySerializer.class.getName());
+    kafkaProducerProperties.put("value.serializer", ByteArraySerializer.class.getName());
+    kafkaProducerProperties.put("acks", "all");
     kafkaProperties.put("request.required.acks", "1");
     return kafkaProducerProperties;
   }
@@ -315,56 +211,48 @@ public class TestKafkaExtractionCluster
   }
 
   @Test(timeout = 60_000L)
-  public void testSimpleRename() throws InterruptedException
+  public void testSimpleLookup() throws InterruptedException
   {
-    final Properties kafkaProducerProperties = makeProducerProperties();
-    final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
-    closer.register(new Closeable()
-    {
-      @Override
-      public void close()
-      {
-        producer.close();
-      }
-    });
-    checkServer();
+    try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
+      checkServer();
 
-    assertUpdated(null, "foo");
-    assertReverseUpdated(ImmutableList.of(), "foo");
+      assertUpdated(null, "foo");
+      assertReverseUpdated(ImmutableList.of(), "foo");
 
-    long events = factory.getCompletedEventCount();
+      long events = factory.getCompletedEventCount();
 
-    log.info("-------------------------     Sending foo bar     -------------------------------");
-    producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
+      log.info("-------------------------     Sending foo bar     -------------------------------");
+      producer.send(new ProducerRecord<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
 
-    long start = System.currentTimeMillis();
-    while (events == factory.getCompletedEventCount()) {
-      Thread.sleep(100);
-      if (System.currentTimeMillis() > start + 60_000) {
-        throw new ISE("Took too long to update event");
+      long start = System.currentTimeMillis();
+      while (events == factory.getCompletedEventCount()) {
+        Thread.sleep(100);
+        if (System.currentTimeMillis() > start + 60_000) {
+          throw new ISE("Took too long to update event");
+        }
       }
-    }
 
-    log.info("-------------------------     Checking foo bar     -------------------------------");
-    assertUpdated("bar", "foo");
-    assertReverseUpdated(Collections.singletonList("foo"), "bar");
-    assertUpdated(null, "baz");
+      log.info("-------------------------     Checking foo bar     -------------------------------");
+      assertUpdated("bar", "foo");
+      assertReverseUpdated(Collections.singletonList("foo"), "bar");
+      assertUpdated(null, "baz");
 
-    checkServer();
-    events = factory.getCompletedEventCount();
+      checkServer();
+      events = factory.getCompletedEventCount();
 
-    log.info("-------------------------     Sending baz bat     -------------------------------");
-    producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
-    while (events == factory.getCompletedEventCount()) {
-      Thread.sleep(10);
-      if (System.currentTimeMillis() > start + 60_000) {
-        throw new ISE("Took too long to update event");
+      log.info("-------------------------     Sending baz bat     -------------------------------");
+      producer.send(new ProducerRecord<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
+      while (events == factory.getCompletedEventCount()) {
+        Thread.sleep(10);
+        if (System.currentTimeMillis() > start + 60_000) {
+          throw new ISE("Took too long to update event");
+        }
       }
-    }
 
-    log.info("-------------------------     Checking baz bat     -------------------------------");
-    Assert.assertEquals("bat", factory.get().apply("baz"));
-    Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
+      log.info("-------------------------     Checking baz bat     -------------------------------");
+      Assert.assertEquals("bat", factory.get().apply("baz"));
+      Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
+    }
   }
 
   private void assertUpdated(
diff --git a/licenses.yaml b/licenses.yaml
index a39f2ee..09bfd64 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -2238,23 +2238,13 @@ name: Apache Kafka
 license_category: binary
 module: extensions/kafka-extraction-namespace
 license_name: Apache License version 2.0
-version: 0.8.2.1
+version: 2.1.0
 libraries:
-  - org.apache.kafka: kafka_2.10
+  - org.apache.kafka: kafka_2.12
   - org.apache.kafka: kafka-clients
 
 ---
 
-name: ZooKeeper Client
-license_category: binary
-module: extensions/kafka-extraction-namespace
-license_name: Apache License version 2.0
-version: 0.3
-libraries:
-  - com.101tec: zkclient
-
----
-
 name: Metrics Core Library
 license_category: binary
 module: extensions/kafka-extraction-namespace
@@ -2280,10 +2270,9 @@ libraries:
 name: Scala Library
 license_category: binary
 module: extensions/kafka-extraction-namespace
-license_name: BSD-3-Clause License
-copyright: EPFL, Lightbend Inc.
-version: 2.10.4
-license_file_path: licenses/bin/scala-lang.BSD3
+license_name: Apache License version 2.0
+copyright: LAMP/EPFL and Lightbend, Inc.
+version: 2.12.7
 libraries:
   - org.scala-lang: scala-library
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org