You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/11/13 22:31:15 UTC

[incubator-druid] branch 0.13.0-incubating updated: fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603) (#6608)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
     new ee6139a  fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603) (#6608)
ee6139a is described below

commit ee6139adbb1489f7ee54d17ed4f280387d7a68a6
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Tue Nov 13 14:31:10 2018 -0800

    fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603) (#6608)
---
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  3 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 67 ++++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e815cfa..6cd88c7 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -431,6 +431,9 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
           // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
           if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
+          }
+
+          if (stopRequested.get()) {
             break;
           }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 6dd210a..ba8be70 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -762,6 +762,73 @@ public class KafkaIndexTaskTest
   }
 
   @Test(timeout = 60_000L)
+  public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+
+    List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
+    );
+
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment has more than one record, incremental publishing should happen
+    maxRowsPerSegment = 2;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+    }
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L));
+    final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L));
+    final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 7L));
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets));
+
+    // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
+    // will continue reading through the end offset of the checkpointed sequence
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    // processed count would be 5 if it stopped at it's current offsets
+    Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+  }
+
+  @Test(timeout = 60_000L)
   public void testRunWithMinimumMessageTime() throws Exception
   {
     final KafkaIndexTask task = createTask(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org