You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/01/03 04:16:04 UTC

[GitHub] asdf2014 closed pull request #6773: make kafka poll timeout can be configured

asdf2014 closed pull request #6773: make kafka poll timeout can be configured
URL: https://github.com/apache/incubator-druid/pull/6773
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md
index aa771b7a27c..af391bbae73 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -193,6 +193,7 @@ For Roaring bitmaps:
 |-----|----|-----------|--------|
 |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
 |`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.html) or String password.|yes|
+|`pollTimeout`|Long|The length of time to wait for the kafka consumer to poll records, in milliseconds|no (default == 100)|
 |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against node failure.|no (default == 1)|
 |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
 |`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)|
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e8f62620866..6424c290fc9 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -100,7 +100,7 @@ protected Long getSequenceNumberToStoreAfterRead(@NotNull Long sequenceNumber)
     // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
     List<OrderedPartitionableRecord<Integer, Long>> records = new ArrayList<>();
     try {
-      records = recordSupplier.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
+      records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
     }
     catch (OffsetOutOfRangeException e) {
       log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 22ede084b3a..950441c3c59 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -41,13 +41,11 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
 {
   private static final String TYPE = "index_kafka";
-  static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
 
   private final KafkaIndexTaskIOConfig ioConfig;
   private final ObjectMapper configMapper;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index fc5c28751bc..af84bfcd4ab 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
 import org.joda.time.DateTime;
@@ -32,6 +33,7 @@
 public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Integer, Long>
 {
   private final Map<String, Object> consumerProperties;
+  private final long pollTimeout;
 
   @JsonCreator
   public KafkaIndexTaskIOConfig(
@@ -40,6 +42,7 @@ public KafkaIndexTaskIOConfig(
       @JsonProperty("startPartitions") SeekableStreamPartitions<Integer, Long> startPartitions,
       @JsonProperty("endPartitions") SeekableStreamPartitions<Integer, Long> endPartitions,
       @JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+      @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("useTransaction") Boolean useTransaction,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@@ -59,6 +62,7 @@ public KafkaIndexTaskIOConfig(
     );
 
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
+    this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
 
     for (int partition : endPartitions.getPartitionSequenceNumberMap().keySet()) {
       Preconditions.checkArgument(
@@ -77,6 +81,12 @@ public KafkaIndexTaskIOConfig(
     return consumerProperties;
   }
 
+  @JsonProperty
+  public long getPollTimeout()
+  {
+    return pollTimeout;
+  }
+
   @Override
   public String toString()
   {
@@ -86,6 +96,7 @@ public String toString()
            ", startPartitions=" + getStartPartitions() +
            ", endPartitions=" + getEndPartitions() +
            ", consumerProperties=" + consumerProperties +
+           ", pollTimeout=" + pollTimeout +
            ", useTransaction=" + isUseTransaction() +
            ", minimumMessageTime=" + getMinimumMessageTime() +
            ", maximumMessageTime=" + getMaximumMessageTime() +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 53320c664da..6086daf9d1b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -393,7 +393,7 @@ public void run()
           // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
           ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
           try {
-            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
+            records = consumer.poll(task.getIOConfig().getPollTimeout());
           }
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6f4434ea3e9..0e9dcacc815 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -209,6 +209,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
         new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), startPartitions),
         new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), endPartitions),
         kafkaIoConfig.getConsumerProperties(),
+        kafkaIoConfig.getPollTimeout(),
         true,
         minimumMessageTime,
         maximumMessageTime,
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 80b842b8349..ddd0f06d205 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -34,8 +34,10 @@
   public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password";
   public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
   public static final String KEY_PASSWORD_KEY = "ssl.key.password";
+  public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100;
 
   private final Map<String, Object> consumerProperties;
+  private final long pollTimeout;
   private final boolean skipOffsetGaps;
 
   @JsonCreator
@@ -45,6 +47,7 @@ public KafkaSupervisorIOConfig(
       @JsonProperty("taskCount") Integer taskCount,
       @JsonProperty("taskDuration") Period taskDuration,
       @JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+      @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("startDelay") Period startDelay,
       @JsonProperty("period") Period period,
       @JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@@ -72,6 +75,7 @@ public KafkaSupervisorIOConfig(
         consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
         StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
     );
+    this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
     this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
   }
 
@@ -87,6 +91,12 @@ public String getTopic()
     return consumerProperties;
   }
 
+  @JsonProperty
+  public long getPollTimeout()
+  {
+    return pollTimeout;
+  }
+
   @JsonProperty
   public boolean isUseEarliestOffset()
   {
@@ -108,6 +118,7 @@ public String toString()
            ", taskCount=" + getTaskCount() +
            ", taskDuration=" + getTaskDuration() +
            ", consumerProperties=" + consumerProperties +
+           ", pollTimeout=" + pollTimeout +
            ", startDelay=" + getStartDelay() +
            ", period=" + getPeriod() +
            ", useEarliestOffset=" + isUseEarliestOffset() +
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 72041eb947d..f44877771fb 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -69,6 +69,7 @@
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -386,6 +387,7 @@ public void testRunAfterDataInserted() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -428,6 +430,7 @@ public void testRunBeforeDataInserted() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -536,6 +539,7 @@ public void testIncrementalHandOff() throws Exception
             startPartitions,
             endPartitions,
             consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -663,6 +667,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
               startPartitions,
               endPartitions,
               consumerProps,
+              KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
               true,
               null,
               null,
@@ -803,6 +808,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception
             startPartitions,
             endPartitions,
             consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -905,6 +911,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
             startPartitions,
             endPartitions,
             consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -941,6 +948,7 @@ public void testRunWithMinimumMessageTime() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             DateTimes.of("2010"),
             null,
@@ -995,6 +1003,7 @@ public void testRunWithMaximumMessageTime() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             DateTimes.of("2010"),
@@ -1059,6 +1068,7 @@ public void testRunWithTransformSpec() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1119,6 +1129,7 @@ public void testRunOnNothing() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1160,6 +1171,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1212,6 +1224,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1267,6 +1280,7 @@ public void testReportParseExceptions() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 7L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1311,6 +1325,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1393,6 +1408,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1453,6 +1469,7 @@ public void testRunReplicas() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1467,6 +1484,7 @@ public void testRunReplicas() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1521,6 +1539,7 @@ public void testRunConflicting() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1535,6 +1554,7 @@ public void testRunConflicting() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1590,6 +1610,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             false,
             null,
             null,
@@ -1604,6 +1625,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             false,
             null,
             null,
@@ -1664,6 +1686,7 @@ public void testRunOneTaskTwoPartitions() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L, 1, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L, 1, 2L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1729,6 +1752,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1743,6 +1767,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 1L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1799,6 +1824,7 @@ public void testRestore() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1836,6 +1862,7 @@ public void testRestore() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1888,6 +1915,7 @@ public void testRunWithPauseAndResume() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -1972,6 +2000,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -2011,6 +2040,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
@@ -2065,6 +2095,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
             null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index cbf25def6b4..3337faa080a 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -71,6 +71,7 @@ public void testSerdeWithDefaults() throws Exception
     Assert.assertEquals(1, (int) config.getTaskCount());
     Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
+    Assert.assertEquals(100, config.getPollTimeout());
     Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
     Assert.assertEquals(false, config.isUseEarliestOffset());
@@ -90,6 +91,7 @@ public void testSerdeWithNonDefaults() throws Exception
                      + "  \"taskCount\": 9,\n"
                      + "  \"taskDuration\": \"PT30M\",\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+                     + "  \"pollTimeout\": 1000,\n"
                      + "  \"startDelay\": \"PT1M\",\n"
                      + "  \"period\": \"PT10S\",\n"
                      + "  \"useEarliestOffset\": true,\n"
@@ -113,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
     Assert.assertEquals(9, (int) config.getTaskCount());
     Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
+    Assert.assertEquals(1000, config.getPollTimeout());
     Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
     Assert.assertEquals(true, config.isUseEarliestOffset());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 427b9d1742d..a08806acc17 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2793,6 +2793,7 @@ private KafkaSupervisor getSupervisor(
         taskCount,
         new Period(duration),
         consumerProperties,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
         useEarliestOffset,
@@ -2904,6 +2905,7 @@ private KafkaIndexTask createKafkaIndexTask(
             startPartitions,
             endPartitions,
             ImmutableMap.of("bootstrap.servers", kafkaHost),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             minimumMessageTime,
             maximumMessageTime,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org