You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/11/12 22:27:39 UTC

[incubator-druid] branch master updated: fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e326086  fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603)
e326086 is described below

commit e3260866046015183ca91257376ad36f95269a27
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Mon Nov 12 14:27:32 2018 -0800

    fix kafka indexing task not processing through end offsets on publish, fixes #6602 (#6603)
---
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  3 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 67 ++++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index a66ad4c..57c666f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -430,6 +430,9 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
           // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
           if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
+          }
+
+          if (stopRequested.get()) {
             break;
           }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 6fdfcd8..45c8a93 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -762,6 +762,73 @@ public class KafkaIndexTaskTest
   }
 
   @Test(timeout = 60_000L)
+  public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+
+    List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
+    );
+
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment has more than one record, incremental publishing should happen
+    maxRowsPerSegment = 2;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+    }
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L));
+    final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L));
+    final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 7L));
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets));
+
+    // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
+    // will continue reading through the end offset of the checkpointed sequence
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    // processed count would be 5 if it stopped at it's current offsets
+    Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+  }
+
+  @Test(timeout = 60_000L)
   public void testRunWithMinimumMessageTime() throws Exception
   {
     final KafkaIndexTask task = createTask(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org