You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/01/03 04:16:09 UTC

[incubator-druid] branch master updated: make kafka poll timeout can be configured (#6773)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6761663  make kafka poll timeout can be configured (#6773)
6761663 is described below

commit 6761663509025c5c88275a9752c96d417f879abe
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Thu Jan 3 12:16:02 2019 +0800

    make kafka poll timeout can be configured (#6773)
    
    * make kafka poll timeout can be configured
    
    * add doc
    
    * rename DEFAULT_POLL_TIMEOUT to DEFAULT_POLL_TIMEOUT_MILLIS
---
 .../development/extensions-core/kafka-ingestion.md |  1 +
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  2 +-
 .../druid/indexing/kafka/KafkaIndexTask.java       |  2 --
 .../indexing/kafka/KafkaIndexTaskIOConfig.java     | 11 ++++++++
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  2 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  1 +
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  | 11 ++++++++
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 31 ++++++++++++++++++++++
 .../supervisor/KafkaSupervisorIOConfigTest.java    |  3 +++
 .../kafka/supervisor/KafkaSupervisorTest.java      |  2 ++
 10 files changed, 62 insertions(+), 4 deletions(-)

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md
index aa771b7..af391bb 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -193,6 +193,7 @@ For Roaring bitmaps:
 |-----|----|-----------|--------|
 |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
 |`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.html) or String password.|yes|
+|`pollTimeout`|Long|The length of time to wait for the kafka consumer to poll records, in milliseconds|no (default == 100)|
 |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against node failure.|no (default == 1)|
 |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
 |`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)|
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e8f6262..6424c29 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -100,7 +100,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
     // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
     List<OrderedPartitionableRecord<Integer, Long>> records = new ArrayList<>();
     try {
-      records = recordSupplier.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
+      records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
     }
     catch (OffsetOutOfRangeException e) {
       log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 22ede08..950441c 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -41,13 +41,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
 {
   private static final String TYPE = "index_kafka";
-  static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
 
   private final KafkaIndexTaskIOConfig ioConfig;
   private final ObjectMapper configMapper;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index fc5c287..af84bfc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
 import org.joda.time.DateTime;
@@ -32,6 +33,7 @@ import java.util.Map;
 public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Integer, Long>
 {
   private final Map<String, Object> consumerProperties;
+  private final long pollTimeout;
 
   @JsonCreator
   public KafkaIndexTaskIOConfig(
@@ -40,6 +42,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
       @JsonProperty("startPartitions") SeekableStreamPartitions<Integer, Long> startPartitions,
       @JsonProperty("endPartitions") SeekableStreamPartitions<Integer, Long> endPartitions,
       @JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+      @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("useTransaction") Boolean useTransaction,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@@ -59,6 +62,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
     );
 
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
+    this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
 
     for (int partition : endPartitions.getPartitionSequenceNumberMap().keySet()) {
       Preconditions.checkArgument(
@@ -77,6 +81,12 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
     return consumerProperties;
   }
 
+  @JsonProperty
+  public long getPollTimeout()
+  {
+    return pollTimeout;
+  }
+
   @Override
   public String toString()
   {
@@ -86,6 +96,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
            ", startPartitions=" + getStartPartitions() +
            ", endPartitions=" + getEndPartitions() +
            ", consumerProperties=" + consumerProperties +
+           ", pollTimeout=" + pollTimeout +
            ", useTransaction=" + isUseTransaction() +
            ", minimumMessageTime=" + getMinimumMessageTime() +
            ", maximumMessageTime=" + getMaximumMessageTime() +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 53320c6..6086daf 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -393,7 +393,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
           // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
           ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
           try {
-            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
+            records = consumer.poll(task.getIOConfig().getPollTimeout());
           }
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6f4434e..0e9dcac 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -209,6 +209,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
         new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), startPartitions),
         new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), endPartitions),
         kafkaIoConfig.getConsumerProperties(),
+        kafkaIoConfig.getPollTimeout(),
         true,
         minimumMessageTime,
         maximumMessageTime,
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 80b842b..ddd0f06 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -34,8 +34,10 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
   public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password";
   public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
   public static final String KEY_PASSWORD_KEY = "ssl.key.password";
+  public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100;
 
   private final Map<String, Object> consumerProperties;
+  private final long pollTimeout;
   private final boolean skipOffsetGaps;
 
   @JsonCreator
@@ -45,6 +47,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
       @JsonProperty("taskCount") Integer taskCount,
       @JsonProperty("taskDuration") Period taskDuration,
       @JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+      @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("startDelay") Period startDelay,
       @JsonProperty("period") Period period,
       @JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@@ -72,6 +75,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
         consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
         StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
     );
+    this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
     this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
   }
 
@@ -88,6 +92,12 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
   }
 
   @JsonProperty
+  public long getPollTimeout()
+  {
+    return pollTimeout;
+  }
+
+  @JsonProperty
   public boolean isUseEarliestOffset()
   {
     return isUseEarliestSequenceNumber();
@@ -108,6 +118,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
            ", taskCount=" + getTaskCount() +
            ", taskDuration=" + getTaskDuration() +
            ", consumerProperties=" + consumerProperties +
+           ", pollTimeout=" + pollTimeout +
            ", startDelay=" + getStartDelay() +
            ", period=" + getPeriod() +
            ", useEarliestOffset=" + isUseEarliestOffset() +
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 72041eb..f448777 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -386,6 +387,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -428,6 +430,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -536,6 +539,7 @@ public class KafkaIndexTaskTest
             startPartitions,
             endPartitions,
             consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -663,6 +667,7 @@ public class KafkaIndexTaskTest
               startPartitions,
               endPartitions,
               consumerProps,
+              KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
               true,
               null,
               null,
@@ -803,6 +808,7 @@ public class KafkaIndexTaskTest
             startPartitions,
             endPartitions,
             consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -905,6 +911,7 @@ public class KafkaIndexTaskTest
             startPartitions,
             endPartitions,
             consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -941,6 +948,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             DateTimes.of("2010"),
             null,
@@ -995,6 +1003,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             DateTimes.of("2010"),
@@ -1059,6 +1068,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1119,6 +1129,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1160,6 +1171,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1212,6 +1224,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1267,6 +1280,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 7L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1311,6 +1325,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1393,6 +1408,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1453,6 +1469,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1467,6 +1484,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1521,6 +1539,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1535,6 +1554,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1590,6 +1610,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             false,
             null,
             null,
@@ -1604,6 +1625,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             false,
             null,
             null,
@@ -1664,6 +1686,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L, 1, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L, 1, 2L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1729,6 +1752,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1743,6 +1767,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 1L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1799,6 +1824,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1836,6 +1862,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1888,6 +1915,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1972,6 +2000,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -2011,6 +2040,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -2065,6 +2095,7 @@ public class KafkaIndexTaskTest
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index cbf25de..3337faa 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -71,6 +71,7 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(1, (int) config.getTaskCount());
     Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
+    Assert.assertEquals(100, config.getPollTimeout());
     Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
     Assert.assertEquals(false, config.isUseEarliestOffset());
@@ -90,6 +91,7 @@ public class KafkaSupervisorIOConfigTest
                      + "  \"taskCount\": 9,\n"
                      + "  \"taskDuration\": \"PT30M\",\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+                     + "  \"pollTimeout\": 1000,\n"
                      + "  \"startDelay\": \"PT1M\",\n"
                      + "  \"period\": \"PT10S\",\n"
                      + "  \"useEarliestOffset\": true,\n"
@@ -113,6 +115,7 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(9, (int) config.getTaskCount());
     Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
+    Assert.assertEquals(1000, config.getPollTimeout());
     Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
     Assert.assertEquals(true, config.isUseEarliestOffset());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 427b9d1..a08806a 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2793,6 +2793,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         taskCount,
         new Period(duration),
         consumerProperties,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
         useEarliestOffset,
@@ -2904,6 +2905,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             startPartitions,
             endPartitions,
             ImmutableMap.of("bootstrap.servers", kafkaHost),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             minimumMessageTime,
             maximumMessageTime,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org