You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/13 22:31:12 UTC

[GitHub] fjy closed pull request #6608: [Backport] fix kafka indexing task not processing through end offsets on publish

fjy closed pull request #6608: [Backport] fix kafka indexing task not processing through end offsets on publish
URL: https://github.com/apache/incubator-druid/pull/6608
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e815cfa3016..6cd88c76569 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -431,6 +431,9 @@ public void run()
           // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
           if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
+          }
+
+          if (stopRequested.get()) {
             break;
           }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 6dd210ae4c5..ba8be7086cc 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -761,6 +761,73 @@ public void testTimeBasedIncrementalHandOff() throws Exception
     Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
   }
 
+  @Test(timeout = 60_000L)
+  public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+
+    List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
+    );
+
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment has more than one record, incremental publishing should happen
+    maxRowsPerSegment = 2;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+    }
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L));
+    final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L));
+    final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 7L));
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets));
+
+    // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
+    // will continue reading through the end offset of the checkpointed sequence
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    // processed count would be 5 if it stopped at it's current offsets
+    Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+  }
+
   @Test(timeout = 60_000L)
   public void testRunWithMinimumMessageTime() throws Exception
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org