You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/08/14 20:46:37 UTC
[incubator-druid] branch master updated: Upgrade Kafka library for
kafka-lookup module (#8078)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1f3a996 Upgrade Kafka library for kafka-lookup module (#8078)
1f3a996 is described below
commit 1f3a99616d06a0131f4ae1b66ef37442f1b7c796
Author: Sayat <sa...@gmail.com>
AuthorDate: Wed Aug 14 22:46:25 2019 +0200
Upgrade Kafka library for kafka-lookup module (#8078)
* Upgrade Kafka library for kafka-lookup module
* Update licenes.yaml
* Adopt class workaround from KafkaRecordSupplier#getKafkaConsumer
* Update lisences for kafka clients
---
extensions-core/kafka-extraction-namespace/pom.xml | 22 +-
.../query/lookup/KafkaLookupExtractorFactory.java | 190 ++++++------
.../lookup/KafkaLookupExtractorFactoryTest.java | 118 +++-----
.../query/lookup/TestKafkaExtractionCluster.java | 320 +++++++--------------
licenses.yaml | 21 +-
5 files changed, 250 insertions(+), 421 deletions(-)
diff --git a/extensions-core/kafka-extraction-namespace/pom.xml b/extensions-core/kafka-extraction-namespace/pom.xml
index fb1f27f..2867d12 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -33,6 +33,10 @@
<relativePath>../../pom.xml</relativePath>
</parent>
+ <properties>
+ <apache.kafka.version>2.1.0</apache.kafka.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
@@ -59,18 +63,10 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
+ <artifactId>kafka-clients</artifactId>
+ <version>${apache.kafka.version}</version>
<exclusions>
<exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
@@ -99,6 +95,12 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>${apache.kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>mx4j</groupId>
<artifactId>mx4j-tools</artifactId>
<version>3.0.1</version>
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index 01b3dd2..bd5ccf7 100644
--- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -30,26 +30,25 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.consumer.ZookeeperConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.Decoder;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
-
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -62,21 +61,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
@JsonTypeName("kafka")
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
{
private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
- static final Decoder<String> DEFAULT_STRING_DECODER = new Decoder<String>()
- {
- @Override
- public String fromBytes(byte[] bytes)
- {
- return StringUtils.fromUtf8(bytes);
- }
- };
-
private final ListeningExecutorService executorService;
private final AtomicLong doubleEventCount = new AtomicLong(0L);
private final NamespaceExtractionCacheManager cacheManager;
@@ -85,7 +74,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
private final AtomicBoolean started = new AtomicBoolean(false);
private CacheHandler cacheHandler;
- private volatile ConsumerConnector consumerConnector;
private volatile ListenableFuture<?> future = null;
@JsonProperty
@@ -156,92 +144,57 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
synchronized (started) {
if (started.get()) {
LOG.warn("Already started, not starting again");
- return started.get();
+ return true;
}
if (executorService.isShutdown()) {
LOG.warn("Already shut down, not starting again");
return false;
}
- final Properties kafkaProperties = new Properties();
- kafkaProperties.putAll(getKafkaProperties());
- if (kafkaProperties.containsKey("group.id")) {
- throw new IAE(
- "Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
- kafkaProperties.getProperty("group.id")
- );
- }
- if (kafkaProperties.containsKey("auto.offset.reset")) {
- throw new IAE(
- "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
- kafkaProperties.getProperty("auto.offset.reset")
- );
- }
- Preconditions.checkNotNull(
- kafkaProperties.getProperty("zookeeper.connect"),
- "zookeeper.connect required property"
- );
+ verifyKafkaProperties();
- kafkaProperties.setProperty("group.id", factoryId);
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
cacheHandler = cacheManager.createCache();
final ConcurrentMap<String, String> map = cacheHandler.getCache();
mapRef.set(map);
- // Enable publish-subscribe
- kafkaProperties.setProperty("auto.offset.reset", "smallest");
+
final CountDownLatch startingReads = new CountDownLatch(1);
- final ListenableFuture<?> future = executorService.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- while (!executorService.isShutdown()) {
- consumerConnector = buildConnector(kafkaProperties);
- try {
- if (executorService.isShutdown()) {
- break;
- }
-
- final List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
- new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER
- );
-
- if (streams == null || streams.isEmpty()) {
- throw new IAE("Topic [%s] had no streams", topic);
- }
- if (streams.size() > 1) {
- throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
- }
- final KafkaStream<String, String> kafkaStream = streams.get(0);
-
- startingReads.countDown();
-
- for (final MessageAndMetadata<String, String> messageAndMetadata : kafkaStream) {
- final String key = messageAndMetadata.key();
- final String message = messageAndMetadata.message();
- if (key == null || message == null) {
- LOG.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
- continue;
- }
- doubleEventCount.incrementAndGet();
- map.put(key, message);
- doubleEventCount.incrementAndGet();
- LOG.trace("Placed key[%s] val[%s]", key, message);
- }
- }
- catch (Exception e) {
- LOG.error(e, "Error reading stream for topic [%s]", topic);
- }
- finally {
- consumerConnector.shutdown();
+ final ListenableFuture<?> future = executorService.submit(() -> {
+ final Consumer<String, String> consumer = getConsumer();
+ consumer.subscribe(Collections.singletonList(topic));
+ try {
+ while (!executorService.isShutdown()) {
+ try {
+ if (executorService.isShutdown()) {
+ break;
+ }
+ final ConsumerRecords<String, String> records = consumer.poll(1000);
+ startingReads.countDown();
+
+ for (final ConsumerRecord<String, String> record : records) {
+ final String key = record.key();
+ final String message = record.value();
+ if (key == null || message == null) {
+ LOG.error("Bad key/message from topic [%s]: [%s]", topic, record);
+ continue;
}
+ doubleEventCount.incrementAndGet();
+ map.put(key, message);
+ doubleEventCount.incrementAndGet();
+ LOG.trace("Placed key[%s] val[%s]", key, message);
}
}
+ catch (Exception e) {
+ LOG.error(e, "Error reading stream for topic [%s]", topic);
+ }
}
- );
+ }
+ finally {
+ consumer.close();
+ }
+ });
Futures.addCallback(
future,
new FutureCallback<Object>()
@@ -293,14 +246,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
}
}
- // Overridden in tests
- ConsumerConnector buildConnector(Properties properties)
- {
- return new ZookeeperConsumerConnector(
- new ConsumerConfig(properties)
- );
- }
-
@Override
public boolean close()
{
@@ -312,10 +257,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
started.set(false);
executorService.shutdown();
- if (consumerConnector != null) {
- consumerConnector.shutdown();
- }
-
final ListenableFuture<?> future = this.future;
if (future != null) {
if (!future.isDone() && !future.cancel(false)) {
@@ -413,4 +354,51 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
{
return future;
}
+
+ private void verifyKafkaProperties()
+ {
+ if (kafkaProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ throw new IAE(
+ "Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
+ kafkaProperties.get(ConsumerConfig.GROUP_ID_CONFIG)
+ );
+ }
+ if (kafkaProperties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
+ throw new IAE(
+ "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
+ kafkaProperties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+ );
+ }
+ Preconditions.checkNotNull(
+ kafkaProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ "bootstrap.servers required property"
+ );
+ }
+
+ // Overridden in tests
+ Consumer<String, String> getConsumer()
+ {
+ // Workaround for Kafka String Serializer could not be found
+ // Adopted from org.apache.druid.indexing.kafka.KafkaRecordSupplier#getKafkaConsumer
+ ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ final Properties properties = getConsumerProperties();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ return new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxCl);
+ }
+ }
+
+ @Nonnull
+ private Properties getConsumerProperties()
+ {
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProperties);
+ // Enable publish-subscribe
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, factoryId);
+ return properties;
+ }
}
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index f50d65c..50fcca0 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -23,17 +23,14 @@ import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Bytes;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.TopicFilter;
-import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
@@ -51,10 +48,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@RunWith(PowerMockRunner.class)
@@ -278,17 +273,7 @@ public class KafkaLookupExtractorFactoryTest
@Test
public void testStartStop()
{
- final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
- final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
- final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
- EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
- EasyMock.anyObject(TopicFilter.class),
- EasyMock.anyInt(),
- EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
- EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
- )).andReturn(ImmutableList.of(kafkaStream)).once();
- EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
- EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
+ Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
@@ -296,34 +281,26 @@ public class KafkaLookupExtractorFactoryTest
cacheHandler.close();
EasyMock.expectLastCall();
- final AtomicBoolean threadWasInterrupted = new AtomicBoolean(false);
- consumerConnector.shutdown();
- EasyMock.expectLastCall().andAnswer(() -> {
- threadWasInterrupted.set(Thread.currentThread().isInterrupted());
- return null;
- }).times(2);
+ PowerMock.replay(cacheManager, cacheHandler);
- PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
- ImmutableMap.of("zookeeper.connect", "localhost"),
+ ImmutableMap.of("bootstrap.servers", "localhost"),
10_000L,
false
)
{
@Override
- ConsumerConnector buildConnector(Properties properties)
+ Consumer<String, String> getConsumer()
{
- return consumerConnector;
+ return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.getFuture().isDone());
- Assert.assertFalse(threadWasInterrupted.get());
-
PowerMock.verify(cacheManager, cacheHandler);
}
@@ -334,20 +311,20 @@ public class KafkaLookupExtractorFactoryTest
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
- EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
+ EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
- ImmutableMap.of("zookeeper.connect", "localhost"),
+ ImmutableMap.of("bootstrap.servers", "localhost"),
1,
false
)
{
@Override
- ConsumerConnector buildConnector(Properties properties)
+ Consumer getConsumer()
{
// Lock up
try {
@@ -368,36 +345,24 @@ public class KafkaLookupExtractorFactoryTest
@Test
public void testStartStopStart()
{
- final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
- final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
- final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
- EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
- EasyMock.anyObject(TopicFilter.class),
- EasyMock.anyInt(),
- EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
- EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
- )).andReturn(ImmutableList.of(kafkaStream)).once();
- EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
- EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
+ Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
- consumerConnector.shutdown();
- EasyMock.expectLastCall().times(2);
- PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
+ PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
- ImmutableMap.of("zookeeper.connect", "localhost")
+ ImmutableMap.of("bootstrap.servers", "localhost")
)
{
@Override
- ConsumerConnector buildConnector(Properties properties)
+ Consumer<String, String> getConsumer()
{
- return consumerConnector;
+ return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
@@ -407,40 +372,28 @@ public class KafkaLookupExtractorFactoryTest
}
@Test
- public void testStartStartStop()
+ public void testStartStartStopStop()
{
- final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
- final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
- final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
- EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
- EasyMock.anyObject(TopicFilter.class),
- EasyMock.anyInt(),
- EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
- EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
- )).andReturn(ImmutableList.of(kafkaStream)).once();
- EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
- EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
+ Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
- consumerConnector.shutdown();
- EasyMock.expectLastCall().times(3);
- PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
+ PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
- ImmutableMap.of("zookeeper.connect", "localhost"),
+ ImmutableMap.of("bootstrap.servers", "localhost"),
10_000L,
false
)
{
@Override
- ConsumerConnector buildConnector(Properties properties)
+ Consumer<String, String> getConsumer()
{
- return consumerConnector;
+ return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
@@ -453,7 +406,12 @@ public class KafkaLookupExtractorFactoryTest
@Test
public void testStartFailsOnMissingConnect()
{
- expectedException.expectMessage("zookeeper.connect required property");
+ expectedException.expectMessage("bootstrap.servers required property");
+ EasyMock.expect(cacheManager.createCache())
+ .andReturn(cacheHandler)
+ .once();
+ EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
+ cacheHandler.close();
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
@@ -470,7 +428,12 @@ public class KafkaLookupExtractorFactoryTest
{
expectedException.expectMessage(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
- PowerMock.replay(cacheManager);
+ EasyMock.expect(cacheManager.createCache())
+ .andReturn(cacheHandler)
+ .once();
+ EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>());
+ cacheHandler.close();
+ PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
@@ -486,6 +449,12 @@ public class KafkaLookupExtractorFactoryTest
{
expectedException.expectMessage(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
+ EasyMock.expect(cacheManager.createCache())
+ .andReturn(cacheHandler)
+ .once();
+ EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
+ cacheHandler.close();
+ EasyMock.expectLastCall();
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
@@ -533,13 +502,6 @@ public class KafkaLookupExtractorFactoryTest
Assert.assertEquals(injective, otherFactory.isInjective());
}
- @Test
- public void testDefaultDecoder()
- {
- final String str = "some string";
- Assert.assertEquals(str, KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER.fromBytes(StringUtils.toUtf8(str)));
- }
-
private IAnswer<Boolean> getBlockingAnswer()
{
return () -> {
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
index a830ba5..ac2ad60 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -25,34 +25,31 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
-import kafka.admin.AdminUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.TestingCluster;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.initialization.Initialization;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
-import org.apache.zookeeper.CreateMode;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import scala.Some;
+import scala.collection.immutable.List$;
-import java.io.Closeable;
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -60,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
/**
*
@@ -76,166 +72,37 @@ public class TestKafkaExtractionCluster
private final Closer closer = Closer.create();
+ private TestingCluster zkServer;
private KafkaServer kafkaServer;
- private KafkaConfig kafkaConfig;
- private TestingServer zkTestServer;
- private ZkClient zkClient;
private Injector injector;
private ObjectMapper mapper;
private KafkaLookupExtractorFactory factory;
+ private static List<ProducerRecord<byte[], byte[]>> generateRecords()
+ {
+ return ImmutableList.of(
+ new ProducerRecord<>(topicName, 0,
+ StringUtils.toUtf8("abcdefg"),
+ StringUtils.toUtf8("abcdefg")));
+ }
+
@Before
public void setUp() throws Exception
{
- zkTestServer = new TestingServer(-1, temporaryFolder.newFolder(), true);
- zkTestServer.start();
-
- closer.register(new Closeable()
- {
- @Override
- public void close() throws IOException
- {
- zkTestServer.stop();
- }
- });
-
- zkClient = new ZkClient(
- zkTestServer.getConnectString(),
- 10000,
- 10000,
- ZKStringSerializer$.MODULE$
- );
- closer.register(new Closeable()
- {
- @Override
- public void close()
- {
- zkClient.close();
- }
- });
- if (!zkClient.exists("/kafka")) {
- zkClient.create("/kafka", null, CreateMode.PERSISTENT);
- }
-
- log.info("---------------------------Started ZK---------------------------");
-
- final String zkKafkaPath = "/kafka";
-
- final Properties serverProperties = new Properties();
- serverProperties.putAll(kafkaProperties);
- serverProperties.put("broker.id", "0");
- serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
- serverProperties.put("log.cleaner.enable", "true");
- serverProperties.put("host.name", "127.0.0.1");
- serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
- serverProperties.put("zookeeper.session.timeout.ms", "10000");
- serverProperties.put("zookeeper.sync.time.ms", "200");
- serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
+ zkServer = new TestingCluster(1);
+ zkServer.start();
- kafkaConfig = new KafkaConfig(serverProperties);
-
- final long time = DateTimes.of("2015-01-01").getMillis();
kafkaServer = new KafkaServer(
- kafkaConfig,
- new Time()
- {
-
- @Override
- public long milliseconds()
- {
- return time;
- }
-
- @Override
- public long nanoseconds()
- {
- return TimeUnit.MILLISECONDS.toNanos(milliseconds());
- }
-
- @Override
- public void sleep(long ms)
- {
- try {
- Thread.sleep(ms);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- );
- kafkaServer.startup();
- closer.register(new Closeable()
- {
- @Override
- public void close()
- {
- kafkaServer.shutdown();
- kafkaServer.awaitShutdown();
- }
- });
-
- int sleepCount = 0;
-
- while (!kafkaServer.kafkaController().isActive()) {
- Thread.sleep(100);
- if (++sleepCount > 10) {
- throw new InterruptedException("Controller took to long to awaken");
- }
- }
-
- log.info("---------------------------Started Kafka Server---------------------------");
-
- final ZkClient zkClient = new ZkClient(
- zkTestServer.getConnectString() + zkKafkaPath, 10000, 10000,
- ZKStringSerializer$.MODULE$
- );
-
- try (final AutoCloseable autoCloseable = new AutoCloseable()
- {
- @Override
- public void close()
- {
- if (zkClient.exists(zkKafkaPath)) {
- try {
- zkClient.deleteRecursive(zkKafkaPath);
- }
- catch (ZkException ex) {
- log.warn(ex, "error deleting %s zk node", zkKafkaPath);
- }
- }
- zkClient.close();
- }
- }) {
- final Properties topicProperties = new Properties();
- topicProperties.put("cleanup.policy", "compact");
- if (!AdminUtils.topicExists(zkClient, topicName)) {
- AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
- }
-
- log.info("---------------------------Created topic---------------------------");
+ getBrokerProperties(),
+ Time.SYSTEM,
+ Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
+ List$.MODULE$.empty());
- Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
- }
+ kafkaServer.startup();
+ log.info("---------------------------Started Kafka Broker ---------------------------");
- final Properties kafkaProducerProperties = makeProducerProperties();
- final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
- try (final AutoCloseable autoCloseable = new AutoCloseable()
- {
- @Override
- public void close()
- {
- producer.close();
- }
- }) {
- producer.send(
- new KeyedMessage<>(
- topicName,
- StringUtils.toUtf8("abcdefg"),
- StringUtils.toUtf8("abcdefg")
- )
- );
- }
+ log.info("---------------------------Publish Messages to topic-----------------------");
+ publishRecordsToKafka();
System.setProperty("druid.extensions.searchCurrentClassloader", "false");
@@ -260,10 +127,7 @@ public class TestKafkaExtractionCluster
mapper = injector.getInstance(ObjectMapper.class);
log.info("--------------------------- placed default item via producer ---------------------------");
- final Map<String, String> consumerProperties = new HashMap<>(kafkaProperties);
- consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
- consumerProperties.put("zookeeper.session.timeout.ms", "10000");
- consumerProperties.put("zookeeper.sync.time.ms", "200");
+ final Map<String, String> consumerProperties = getConsumerProperties();
final KafkaLookupExtractorFactory kafkaLookupExtractorFactory = new KafkaLookupExtractorFactory(
null,
@@ -278,17 +142,48 @@ public class TestKafkaExtractionCluster
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaTopic(), factory.getKafkaTopic());
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaProperties(), factory.getKafkaProperties());
factory.start();
- closer.register(new Closeable()
- {
- @Override
- public void close()
- {
- factory.close();
- }
- });
+ closer.register(() -> factory.close());
log.info("--------------------------- started rename manager ---------------------------");
}
+ @Nonnull
+ private Map<String, String> getConsumerProperties()
+ {
+ final Map<String, String> props = new HashMap<>(kafkaProperties);
+ int port = kafkaServer.socketServer().config().port();
+ props.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
+ return props;
+ }
+
+ private void publishRecordsToKafka()
+ {
+ final Properties kafkaProducerProperties = makeProducerProperties();
+
+ try (final Producer<byte[], byte[]> producer = new KafkaProducer(kafkaProducerProperties)) {
+ generateRecords().forEach(producer::send);
+ }
+ }
+
+ @Nonnull
+ private KafkaConfig getBrokerProperties() throws IOException
+ {
+ final Properties serverProperties = new Properties();
+ serverProperties.putAll(kafkaProperties);
+ serverProperties.put("broker.id", "0");
+ serverProperties.put("zookeeper.connect", zkServer.getConnectString());
+ serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
+ serverProperties.put("auto.create.topics.enable", "true");
+ serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
+ serverProperties.put("num.partitions", "1");
+ serverProperties.put("offsets.topic.replication.factor", "1");
+ serverProperties.put("default.replication.factor", "1");
+ serverProperties.put("log.cleaner.enable", "true");
+ serverProperties.put("advertised.host.name", "localhost");
+ serverProperties.put("zookeeper.session.timeout.ms", "30000");
+ serverProperties.put("zookeeper.sync.time.ms", "200");
+ return new KafkaConfig(serverProperties);
+ }
+
@After
public void tearDown() throws Exception
{
@@ -299,10 +194,11 @@ public class TestKafkaExtractionCluster
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
- kafkaProducerProperties.put(
- "metadata.broker.list",
- StringUtils.format("127.0.0.1:%d", kafkaServer.socketServer().port())
- );
+ int port = kafkaServer.socketServer().config().port();
+ kafkaProducerProperties.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
+ kafkaProducerProperties.put("key.serializer", ByteArraySerializer.class.getName());
+ kafkaProducerProperties.put("value.serializer", ByteArraySerializer.class.getName());
+ kafkaProducerProperties.put("acks", "all");
kafkaProperties.put("request.required.acks", "1");
return kafkaProducerProperties;
}
@@ -315,56 +211,48 @@ public class TestKafkaExtractionCluster
}
@Test(timeout = 60_000L)
- public void testSimpleRename() throws InterruptedException
+ public void testSimpleLookup() throws InterruptedException
{
- final Properties kafkaProducerProperties = makeProducerProperties();
- final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
- closer.register(new Closeable()
- {
- @Override
- public void close()
- {
- producer.close();
- }
- });
- checkServer();
+ try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
+ checkServer();
- assertUpdated(null, "foo");
- assertReverseUpdated(ImmutableList.of(), "foo");
+ assertUpdated(null, "foo");
+ assertReverseUpdated(ImmutableList.of(), "foo");
- long events = factory.getCompletedEventCount();
+ long events = factory.getCompletedEventCount();
- log.info("------------------------- Sending foo bar -------------------------------");
- producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
+ log.info("------------------------- Sending foo bar -------------------------------");
+ producer.send(new ProducerRecord<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
- long start = System.currentTimeMillis();
- while (events == factory.getCompletedEventCount()) {
- Thread.sleep(100);
- if (System.currentTimeMillis() > start + 60_000) {
- throw new ISE("Took too long to update event");
+ long start = System.currentTimeMillis();
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(100);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
}
- }
- log.info("------------------------- Checking foo bar -------------------------------");
- assertUpdated("bar", "foo");
- assertReverseUpdated(Collections.singletonList("foo"), "bar");
- assertUpdated(null, "baz");
+ log.info("------------------------- Checking foo bar -------------------------------");
+ assertUpdated("bar", "foo");
+ assertReverseUpdated(Collections.singletonList("foo"), "bar");
+ assertUpdated(null, "baz");
- checkServer();
- events = factory.getCompletedEventCount();
+ checkServer();
+ events = factory.getCompletedEventCount();
- log.info("------------------------- Sending baz bat -------------------------------");
- producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
- while (events == factory.getCompletedEventCount()) {
- Thread.sleep(10);
- if (System.currentTimeMillis() > start + 60_000) {
- throw new ISE("Took too long to update event");
+ log.info("------------------------- Sending baz bat -------------------------------");
+ producer.send(new ProducerRecord<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(10);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
}
- }
- log.info("------------------------- Checking baz bat -------------------------------");
- Assert.assertEquals("bat", factory.get().apply("baz"));
- Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
+ log.info("------------------------- Checking baz bat -------------------------------");
+ Assert.assertEquals("bat", factory.get().apply("baz"));
+ Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
+ }
}
private void assertUpdated(
diff --git a/licenses.yaml b/licenses.yaml
index a39f2ee..09bfd64 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -2238,23 +2238,13 @@ name: Apache Kafka
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0
-version: 0.8.2.1
+version: 2.1.0
libraries:
- - org.apache.kafka: kafka_2.10
+ - org.apache.kafka: kafka_2.12
- org.apache.kafka: kafka-clients
---
-name: ZooKeeper Client
-license_category: binary
-module: extensions/kafka-extraction-namespace
-license_name: Apache License version 2.0
-version: 0.3
-libraries:
- - com.101tec: zkclient
-
----
-
name: Metrics Core Library
license_category: binary
module: extensions/kafka-extraction-namespace
@@ -2280,10 +2270,9 @@ libraries:
name: Scala Library
license_category: binary
module: extensions/kafka-extraction-namespace
-license_name: BSD-3-Clause License
-copyright: EPFL, Lightbend Inc.
-version: 2.10.4
-license_file_path: licenses/bin/scala-lang.BSD3
+license_name: Apache License version 2.0
+copyright: LAMP/EPFL and Lightbend, Inc.
+version: 2.12.7
libraries:
- org.scala-lang: scala-library
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org