You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2023/02/22 13:56:11 UTC

[druid] branch master updated: Fix infinite checkpointing between tasks and overlord (#13825)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d2dbb8b2c0 Fix infinite checkpointing between tasks and overlord (#13825)
d2dbb8b2c0 is described below

commit d2dbb8b2c02b3de9c7049d03bd59b03839aa209b
Author: Abhishek Agarwal <14...@users.noreply.github.com>
AuthorDate: Wed Feb 22 19:25:59 2023 +0530

    Fix infinite checkpointing between tasks and overlord (#13825)
    
    If the intermediate handoff period is less than the task duration and there is no new data in the input topic, task will continuously checkpoint the same offsets again and again. This PR fixes that bug by resetting the checkpoint time even when the task receives the same end offset request again.
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 57 ++++++++++++++++++++++
 .../SeekableStreamIndexTaskRunner.java             | 10 +++-
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 54f4523e02..0ba17cdc38 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -932,6 +932,63 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
     );
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointResetWithSameEndOffsets() throws Exception
+  {
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
+    maxRowsPerSegment = Integer.MAX_VALUE;
+    intermediateHandoffPeriod = new Period().withMillis(10);
+
+    // Insert data
+    insertData();
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions = new SeekableStreamStartSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 0L, 1, 0L),
+        ImmutableSet.of()
+    );
+    final SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions = new SeekableStreamEndSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 2L, 1, 0L)
+    );
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            INPUT_FORMAT,
+            null
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+
+    // task will pause for checkpointing
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    long currentNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    final Map<Integer, Long> nextEndOffsets = task.getRunner().getLastSequenceMetadata().getStartOffsets();
+    task.getRunner().setEndOffsets(nextEndOffsets, false);
+    long newNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    Assert.assertTrue(
+        StringUtils.format(
+            "Old checkpoint time: [%d], new checkpoint time: [%d]",
+            currentNextCheckpointTime,
+            newNextCheckpointTime),
+        newNextCheckpointTime > currentNextCheckpointTime);
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+  }
+
   DataSourceMetadata newDataSchemaMetadata()
   {
     return metadataStorageCoordinator.retrieveDataSourceMetadata(NEW_DATA_SCHEMA.getDataSource());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e509699d5c..794fab2ec5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1222,7 +1222,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     sequences.add(sequenceMetadata);
   }
 
-  private SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata()
+  @VisibleForTesting
+  public SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata()
   {
     Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
     return sequences.get(sequences.size() - 1);
@@ -1651,6 +1652,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
              && !finish)
             || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
           log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequenceNumbers);
+          resetNextCheckpointTime();
           resume();
           return Response.ok(sequenceNumbers).build();
         } else if (latestSequence.isCheckpointed()) {
@@ -1872,6 +1874,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return startTime;
   }
 
+  @VisibleForTesting
+  public long getNextCheckpointTime()
+  {
+    return nextCheckpointTime;
+  }
+
   /**
    * This method does two things:
    * <p>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org