You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/21 18:19:56 UTC
[incubator-druid] branch master updated: Consolidate kafka consumer
configs (#7249)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new e170203 Consolidate kafka consumer configs (#7249)
e170203 is described below
commit e17020387601cdebb7db6a0a8a3c3052aaf66064
Author: Surekha <su...@imply.io>
AuthorDate: Thu Mar 21 11:19:49 2019 -0700
Consolidate kafka consumer configs (#7249)
* Consolidate kafka consumer configs
* change the order of adding properties
* Add consumer properties to fix test
it seems kafka consumer does not reveive any message without these configs
* Use KafkaConsumerConfigs in integration test
* Update zookeeper and kafka versions in the setup.sh for the base druid image
* use version 0.2 of base druid image
* Try to fix tests in KafkaRecordSupplierTest
* unused import
* Fix tests in KafkaSupervisorTest
---
.../druid/indexing/kafka/KafkaConsumerConfigs.java | 48 ++++++++++++++++++++++
.../druid/indexing/kafka/KafkaIndexTask.java | 9 +---
.../druid/indexing/kafka/KafkaRecordSupplier.java | 10 +----
.../indexing/kafka/KafkaRecordSupplierTest.java | 6 ++-
.../kafka/supervisor/KafkaSupervisorTest.java | 5 +--
.../druid/indexing/kafka/test/TestBroker.java | 9 +---
.../seekablestream/common/RecordSupplier.java | 2 -
.../supervisor/SeekableStreamSupervisor.java | 9 +++-
integration-tests/docker-base/setup.sh | 10 ++---
integration-tests/docker/Dockerfile | 2 +-
.../tests/indexer/ITKafkaIndexingServiceTest.java | 5 ++-
11 files changed, 78 insertions(+), 37 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
new file mode 100644
index 0000000..b5f7869
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Common place to keep all kafka consumer configs
+ */
+public class KafkaConsumerConfigs
+{
+
+ public static Map<String, Object> getConsumerProperties()
+ {
+ final Map<String, Object> props = new HashMap<>();
+ props.put("metadata.max.age.ms", "10000");
+ props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
+ props.put("auto.offset.reset", "none");
+ props.put("enable.auto.commit", "false");
+ props.put("isolation.level", "read_committed");
+ return props;
+ }
+
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 4fcbbe1..df56cce 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -96,19 +96,14 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
-
KafkaRecordSupplier.addConsumerPropertiesFromConfig(
props,
configMapper,
ioConfig.getConsumerProperties()
);
-
- props.setProperty("enable.auto.commit", "false");
- props.setProperty("auto.offset.reset", "none");
- props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
- props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
- props.setProperty("isolation.level", "read_committed");
+ props.putAll(consumerConfigs);
return new KafkaConsumer<>(props);
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index be25c49..fcd1673 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -25,9 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
-import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -192,14 +190,10 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
{
+ final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
-
- props.setProperty("metadata.max.age.ms", "10000");
- props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
-
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
-
- props.setProperty("enable.auto.commit", "false");
+ props.putAll(consumerConfigs);
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 54b494d..b505884 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -28,7 +28,6 @@ import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.TestHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -49,7 +48,7 @@ import java.util.stream.Collectors;
public class KafkaRecordSupplierTest
{
- private static final Logger log = new Logger(KafkaRecordSupplierTest.class);
+
private static String topic = "topic";
private static long poll_timeout_millis = 1000;
private static int pollRetry = 5;
@@ -313,6 +312,7 @@ public class KafkaRecordSupplierTest
kafkaServer.consumerProperties(), objectMapper);
recordSupplier.assign(partitions);
+ recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0));
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
@@ -355,6 +355,7 @@ public class KafkaRecordSupplierTest
kafkaServer.consumerProperties(), objectMapper);
recordSupplier.assign(partitions);
+ recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0));
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
@@ -413,6 +414,7 @@ public class KafkaRecordSupplierTest
kafkaServer.consumerProperties(), objectMapper);
recordSupplier.assign(partitions);
+ recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0));
Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition1));
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f2db280..bde9052 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
@@ -104,7 +105,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -2780,10 +2780,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
String kafkaHost
)
{
- Map<String, Object> consumerProperties = new HashMap<>();
+ final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
- consumerProperties.put("isolation.level", "read_committed");
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
replicas,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 10c9b2e..359048c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -24,10 +24,10 @@ import com.google.common.io.Files;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import scala.Some;
@@ -123,13 +123,8 @@ public class TestBroker implements Closeable
public Map<String, Object> consumerProperties()
{
- final Map<String, Object> props = new HashMap<>();
+ final Map<String, Object> props = KafkaConsumerConfigs.getConsumerProperties();
props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
- props.put("key.deserializer", ByteArrayDeserializer.class.getName());
- props.put("value.deserializer", ByteArrayDeserializer.class.getName());
- props.put("group.id", String.valueOf(RANDOM.nextInt()));
- props.put("auto.offset.reset", "earliest");
- props.put("isolation.level", "read_committed");
return props;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index d9e599d..31c343b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -40,8 +40,6 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType> extends Clo
{
/**
* assigns the given partitions to this RecordSupplier
- * and seek to the earliest sequence number. Previously
- * assigned partitions will be replaced.
*
* @param partitions parititions to assign
*/
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d76cc87..fc94ca2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2441,7 +2441,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
synchronized (recordSupplierLock) {
StreamPartition<PartitionIdType> topicPartition = new StreamPartition<>(ioConfig.getStream(), partition);
if (!recordSupplier.getAssignment().contains(topicPartition)) {
- recordSupplier.assign(Collections.singleton(topicPartition));
+ final Set partitions = Collections.singleton(topicPartition);
+ recordSupplier.assign(partitions);
+ try {
+ recordSupplier.seekToEarliest(partitions);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
return useEarliestOffset
diff --git a/integration-tests/docker-base/setup.sh b/integration-tests/docker-base/setup.sh
index 9b69ca0..13ad92f 100644
--- a/integration-tests/docker-base/setup.sh
+++ b/integration-tests/docker-base/setup.sh
@@ -34,14 +34,14 @@ apt-get install -y mysql-server
apt-get install -y supervisor
# Zookeeper
-wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz | tar -xzf - -C /usr/local \
- && cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg \
- && ln -s /usr/local/zookeeper-3.4.11 /usr/local/zookeeper
+wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz | tar -xzf - -C /usr/local \
+ && cp /usr/local/zookeeper-3.4.13/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.13/conf/zoo.cfg \
+ && ln -s /usr/local/zookeeper-3.4.13 /usr/local/zookeeper
# Kafka
# Match the version to the Kafka client used by KafkaSupervisor
-wget -q -O - http://www.us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz | tar -xzf - -C /usr/local \
- && ln -s /usr/local/kafka_2.12-0.10.2.2 /usr/local/kafka
+wget -q -O - http://www.us.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz | tar -xzf - -C /usr/local \
+ && ln -s /usr/local/kafka_2.12-2.1.0 /usr/local/kafka
# Druid system user
adduser --system --group --no-create-home druid \
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index e3f3155..6c5094c 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -14,7 +14,7 @@
# limitations under the License.
# Base image is built from integration-tests/docker-base in the Druid repo
-FROM imply/druiditbase
+FROM imply/druiditbase:0.2
RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index 7447b2f..d297fda 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -27,6 +27,7 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -147,7 +148,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
String spec;
try {
LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
- Properties consumerProperties = new Properties();
+ final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
+ final Properties consumerProperties = new Properties();
+ consumerProperties.putAll(consumerConfigs);
consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
addFilteredProperties(consumerProperties);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org