You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/01/03 04:16:09 UTC
[incubator-druid] branch master updated: make kafka poll timeout
can be configured (#6773)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6761663 make kafka poll timeout can be configured (#6773)
6761663 is described below
commit 6761663509025c5c88275a9752c96d417f879abe
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Thu Jan 3 12:16:02 2019 +0800
make kafka poll timeout can be configured (#6773)
* make kafka poll timeout can be configured
* add doc
* rename DEFAULT_POLL_TIMEOUT to DEFAULT_POLL_TIMEOUT_MILLIS
---
.../development/extensions-core/kafka-ingestion.md | 1 +
.../IncrementalPublishingKafkaIndexTaskRunner.java | 2 +-
.../druid/indexing/kafka/KafkaIndexTask.java | 2 --
.../indexing/kafka/KafkaIndexTaskIOConfig.java | 11 ++++++++
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 2 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 1 +
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 11 ++++++++
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 31 ++++++++++++++++++++++
.../supervisor/KafkaSupervisorIOConfigTest.java | 3 +++
.../kafka/supervisor/KafkaSupervisorTest.java | 2 ++
10 files changed, 62 insertions(+), 4 deletions(-)
diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md
index aa771b7..af391bb 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -193,6 +193,7 @@ For Roaring bitmaps:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.html) or String password.|yes|
+|`pollTimeout`|Long|The length of time to wait for the kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against node failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)|
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e8f6262..6424c29 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -100,7 +100,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
List<OrderedPartitionableRecord<Integer, Long>> records = new ArrayList<>();
try {
- records = recordSupplier.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
+ records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 22ede08..950441c 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -41,13 +41,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
{
private static final String TYPE = "index_kafka";
- static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
private final KafkaIndexTaskIOConfig ioConfig;
private final ObjectMapper configMapper;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index fc5c287..af84bfc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.joda.time.DateTime;
@@ -32,6 +33,7 @@ import java.util.Map;
public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Integer, Long>
{
private final Map<String, Object> consumerProperties;
+ private final long pollTimeout;
@JsonCreator
public KafkaIndexTaskIOConfig(
@@ -40,6 +42,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
@JsonProperty("startPartitions") SeekableStreamPartitions<Integer, Long> startPartitions,
@JsonProperty("endPartitions") SeekableStreamPartitions<Integer, Long> endPartitions,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+ @JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@@ -59,6 +62,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
+ this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
for (int partition : endPartitions.getPartitionSequenceNumberMap().keySet()) {
Preconditions.checkArgument(
@@ -77,6 +81,12 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
return consumerProperties;
}
+ @JsonProperty
+ public long getPollTimeout()
+ {
+ return pollTimeout;
+ }
+
@Override
public String toString()
{
@@ -86,6 +96,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
", startPartitions=" + getStartPartitions() +
", endPartitions=" + getEndPartitions() +
", consumerProperties=" + consumerProperties +
+ ", pollTimeout=" + pollTimeout +
", useTransaction=" + isUseTransaction() +
", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 53320c6..6086daf 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -393,7 +393,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
- records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
+ records = consumer.poll(task.getIOConfig().getPollTimeout());
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6f4434e..0e9dcac 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -209,6 +209,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), startPartitions),
new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), endPartitions),
kafkaIoConfig.getConsumerProperties(),
+ kafkaIoConfig.getPollTimeout(),
true,
minimumMessageTime,
maximumMessageTime,
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 80b842b..ddd0f06 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -34,8 +34,10 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password";
public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
public static final String KEY_PASSWORD_KEY = "ssl.key.password";
+ public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100;
private final Map<String, Object> consumerProperties;
+ private final long pollTimeout;
private final boolean skipOffsetGaps;
@JsonCreator
@@ -45,6 +47,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+ @JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@@ -72,6 +75,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
);
+ this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
}
@@ -88,6 +92,12 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
}
@JsonProperty
+ public long getPollTimeout()
+ {
+ return pollTimeout;
+ }
+
+ @JsonProperty
public boolean isUseEarliestOffset()
{
return isUseEarliestSequenceNumber();
@@ -108,6 +118,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
+ ", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
", useEarliestOffset=" + isUseEarliestOffset() +
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 72041eb..f448777 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -386,6 +387,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -428,6 +430,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -536,6 +539,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -663,6 +667,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -803,6 +808,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -905,6 +911,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -941,6 +948,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
DateTimes.of("2010"),
null,
@@ -995,6 +1003,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
DateTimes.of("2010"),
@@ -1059,6 +1068,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1119,6 +1129,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1160,6 +1171,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1212,6 +1224,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1267,6 +1280,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1311,6 +1325,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1393,6 +1408,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1453,6 +1469,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1467,6 +1484,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1521,6 +1539,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1535,6 +1554,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1590,6 +1610,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
false,
null,
null,
@@ -1604,6 +1625,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
false,
null,
null,
@@ -1664,6 +1686,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L, 1, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1729,6 +1752,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1743,6 +1767,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1799,6 +1824,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1836,6 +1862,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1888,6 +1915,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -1972,6 +2000,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -2011,6 +2040,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@@ -2065,6 +2095,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index cbf25de..3337faa 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -71,6 +71,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
+ Assert.assertEquals(100, config.getPollTimeout());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
Assert.assertEquals(false, config.isUseEarliestOffset());
@@ -90,6 +91,7 @@ public class KafkaSupervisorIOConfigTest
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ + " \"pollTimeout\": 1000,\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
@@ -113,6 +115,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(9, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
+ Assert.assertEquals(1000, config.getPollTimeout());
Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
Assert.assertEquals(true, config.isUseEarliestOffset());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 427b9d1..a08806a 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2793,6 +2793,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
useEarliestOffset,
@@ -2904,6 +2905,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
startPartitions,
endPartitions,
ImmutableMap.of("bootstrap.servers", kafkaHost),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
minimumMessageTime,
maximumMessageTime,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org