You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/11/12 22:27:39 UTC
[incubator-druid] branch master updated: fix kafka indexing task
not processing through end offsets on publish, fixes #6602 (#6603)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new e326086 fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603)
e326086 is described below
commit e3260866046015183ca91257376ad36f95269a27
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Mon Nov 12 14:27:32 2018 -0800
fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603)
---
.../IncrementalPublishingKafkaIndexTaskRunner.java | 3 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 67 ++++++++++++++++++++++
2 files changed, 70 insertions(+)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index a66ad4c..57c666f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -430,6 +430,9 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
status = Status.PUBLISHING;
+ }
+
+ if (stopRequested.get()) {
break;
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 6fdfcd8..45c8a93 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -762,6 +762,73 @@ public class KafkaIndexTaskTest
}
@Test(timeout = 60_000L)
+ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
+ {
+ if (!isIncrementalHandoffSupported) {
+ return;
+ }
+
+ List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
+ new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
+ );
+
+ final String baseSequenceName = "sequence0";
+ // as soon as any segment has more than one record, incremental publishing should happen
+ maxRowsPerSegment = 2;
+
+ // Insert data
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+ for (ProducerRecord<byte[], byte[]> record : records) {
+ kafkaProducer.send(record).get();
+ }
+ }
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+ consumerProps.put("max.poll.records", "1");
+
+ final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L));
+ final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L));
+ final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 7L));
+
+ final KafkaIndexTask task = createTask(
+ null,
+ new KafkaIOConfig(
+ 0,
+ baseSequenceName,
+ startPartitions,
+ endPartitions,
+ consumerProps,
+ true,
+ null,
+ null,
+ false
+ )
+ );
+ final ListenableFuture<TaskStatus> future = runTask(task);
+ while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets));
+
+ // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
+ // will continue reading through the end offset of the checkpointed sequence
+ task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
+
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+ // processed count would be 5 if it stopped at it's current offsets
+ Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+ }
+
+ @Test(timeout = 60_000L)
public void testRunWithMinimumMessageTime() throws Exception
{
final KafkaIndexTask task = createTask(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org