You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/21 18:19:56 UTC

[incubator-druid] branch master updated: Consolidate kafka consumer configs (#7249)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e170203  Consolidate kafka consumer configs (#7249)
e170203 is described below

commit e17020387601cdebb7db6a0a8a3c3052aaf66064
Author: Surekha <su...@imply.io>
AuthorDate: Thu Mar 21 11:19:49 2019 -0700

    Consolidate kafka consumer configs (#7249)
    
    * Consolidate kafka consumer configs
    
    * change the order of adding properties
    
    * Add consumer properties to fix test
    
    it seems kafka consumer does not reveive any message without these configs
    
    * Use KafkaConsumerConfigs in integration test
    
    * Update zookeeper and kafka versions in the setup.sh for the base druid image
    
    *  use version 0.2 of base druid image
    
    * Try to fix tests in KafkaRecordSupplierTest
    
    * unused import
    
    * Fix tests in KafkaSupervisorTest
---
 .../druid/indexing/kafka/KafkaConsumerConfigs.java | 48 ++++++++++++++++++++++
 .../druid/indexing/kafka/KafkaIndexTask.java       |  9 +---
 .../druid/indexing/kafka/KafkaRecordSupplier.java  | 10 +----
 .../indexing/kafka/KafkaRecordSupplierTest.java    |  6 ++-
 .../kafka/supervisor/KafkaSupervisorTest.java      |  5 +--
 .../druid/indexing/kafka/test/TestBroker.java      |  9 +---
 .../seekablestream/common/RecordSupplier.java      |  2 -
 .../supervisor/SeekableStreamSupervisor.java       |  9 +++-
 integration-tests/docker-base/setup.sh             | 10 ++---
 integration-tests/docker/Dockerfile                |  2 +-
 .../tests/indexer/ITKafkaIndexingServiceTest.java  |  5 ++-
 11 files changed, 78 insertions(+), 37 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
new file mode 100644
index 0000000..b5f7869
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Common place to keep all kafka consumer configs
+ */
+public class KafkaConsumerConfigs
+{
+
+  public static Map<String, Object> getConsumerProperties()
+  {
+    final Map<String, Object> props = new HashMap<>();
+    props.put("metadata.max.age.ms", "10000");
+    props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+    props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+    props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
+    props.put("auto.offset.reset", "none");
+    props.put("enable.auto.commit", "false");
+    props.put("isolation.level", "read_committed");
+    return props;
+  }
+
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 4fcbbe1..df56cce 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -96,19 +96,14 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
     try {
       Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
 
+      final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
       final Properties props = new Properties();
-
       KafkaRecordSupplier.addConsumerPropertiesFromConfig(
           props,
           configMapper,
           ioConfig.getConsumerProperties()
       );
-
-      props.setProperty("enable.auto.commit", "false");
-      props.setProperty("auto.offset.reset", "none");
-      props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
-      props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
-      props.setProperty("isolation.level", "read_committed");
+      props.putAll(consumerConfigs);
 
       return new KafkaConsumer<>(props);
     }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index be25c49..fcd1673 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -25,9 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
-import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.PasswordProvider;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -192,14 +190,10 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
 
   private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
   {
+    final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
     final Properties props = new Properties();
-
-    props.setProperty("metadata.max.age.ms", "10000");
-    props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
-
     addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
-
-    props.setProperty("enable.auto.commit", "false");
+    props.putAll(consumerConfigs);
 
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
     try {
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 54b494d..b505884 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -28,7 +28,6 @@ import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.TestHelper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -49,7 +48,7 @@ import java.util.stream.Collectors;
 
 public class KafkaRecordSupplierTest
 {
-  private static final Logger log = new Logger(KafkaRecordSupplierTest.class);
+
   private static String topic = "topic";
   private static long poll_timeout_millis = 1000;
   private static int pollRetry = 5;
@@ -313,6 +312,7 @@ public class KafkaRecordSupplierTest
         kafkaServer.consumerProperties(), objectMapper);
 
     recordSupplier.assign(partitions);
+    recordSupplier.seekToEarliest(partitions);
 
     Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0));
     Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
@@ -355,6 +355,7 @@ public class KafkaRecordSupplierTest
         kafkaServer.consumerProperties(), objectMapper);
 
     recordSupplier.assign(partitions);
+    recordSupplier.seekToEarliest(partitions);
 
     Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0));
     Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
@@ -413,6 +414,7 @@ public class KafkaRecordSupplierTest
         kafkaServer.consumerProperties(), objectMapper);
 
     recordSupplier.assign(partitions);
+    recordSupplier.seekToEarliest(partitions);
 
     Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0));
     Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition1));
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f2db280..bde9052 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
 import org.apache.druid.indexing.kafka.KafkaIndexTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
@@ -104,7 +105,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -2780,10 +2780,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
       String kafkaHost
   )
   {
-    Map<String, Object> consumerProperties = new HashMap<>();
+    final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
     consumerProperties.put("myCustomKey", "myCustomValue");
     consumerProperties.put("bootstrap.servers", kafkaHost);
-    consumerProperties.put("isolation.level", "read_committed");
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
         topic,
         replicas,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 10c9b2e..359048c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -24,10 +24,10 @@ import com.google.common.io.Files;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.commons.io.FileUtils;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
 import scala.Some;
@@ -123,13 +123,8 @@ public class TestBroker implements Closeable
 
   public Map<String, Object> consumerProperties()
   {
-    final Map<String, Object> props = new HashMap<>();
+    final Map<String, Object> props = KafkaConsumerConfigs.getConsumerProperties();
     props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
-    props.put("key.deserializer", ByteArrayDeserializer.class.getName());
-    props.put("value.deserializer", ByteArrayDeserializer.class.getName());
-    props.put("group.id", String.valueOf(RANDOM.nextInt()));
-    props.put("auto.offset.reset", "earliest");
-    props.put("isolation.level", "read_committed");
     return props;
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index d9e599d..31c343b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -40,8 +40,6 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType> extends Clo
 {
   /**
    * assigns the given partitions to this RecordSupplier
-   * and seek to the earliest sequence number. Previously
-   * assigned partitions will be replaced.
    *
    * @param partitions parititions to assign
    */
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d76cc87..fc94ca2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2441,7 +2441,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     synchronized (recordSupplierLock) {
       StreamPartition<PartitionIdType> topicPartition = new StreamPartition<>(ioConfig.getStream(), partition);
       if (!recordSupplier.getAssignment().contains(topicPartition)) {
-        recordSupplier.assign(Collections.singleton(topicPartition));
+        final Set partitions = Collections.singleton(topicPartition);
+        recordSupplier.assign(partitions);
+        try {
+          recordSupplier.seekToEarliest(partitions);
+        }
+        catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
       }
 
       return useEarliestOffset
diff --git a/integration-tests/docker-base/setup.sh b/integration-tests/docker-base/setup.sh
index 9b69ca0..13ad92f 100644
--- a/integration-tests/docker-base/setup.sh
+++ b/integration-tests/docker-base/setup.sh
@@ -34,14 +34,14 @@ apt-get install -y mysql-server
 apt-get install -y supervisor
 
 # Zookeeper
-wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz | tar -xzf - -C /usr/local \
-  && cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg \
-  && ln -s /usr/local/zookeeper-3.4.11 /usr/local/zookeeper
+wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz | tar -xzf - -C /usr/local \
+  && cp /usr/local/zookeeper-3.4.13/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.13/conf/zoo.cfg \
+  && ln -s /usr/local/zookeeper-3.4.13 /usr/local/zookeeper
 
 # Kafka
 # Match the version to the Kafka client used by KafkaSupervisor
-wget -q -O - http://www.us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz | tar -xzf - -C /usr/local \
- && ln -s /usr/local/kafka_2.12-0.10.2.2 /usr/local/kafka
+wget -q -O - http://www.us.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz | tar -xzf - -C /usr/local \
+ && ln -s /usr/local/kafka_2.12-2.1.0 /usr/local/kafka
 
 # Druid system user
 adduser --system --group --no-create-home druid \
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index e3f3155..6c5094c 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 # Base image is built from integration-tests/docker-base in the Druid repo
-FROM imply/druiditbase
+FROM imply/druiditbase:0.2
 
 RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf
 
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index 7447b2f..d297fda 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -27,6 +27,7 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -147,7 +148,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
     String spec;
     try {
       LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
-      Properties consumerProperties = new Properties();
+      final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
+      final Properties consumerProperties = new Properties();
+      consumerProperties.putAll(consumerConfigs);
       consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
       addFilteredProperties(consumerProperties);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org