You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "abhishekagarwal87 (via GitHub)" <gi...@apache.org> on 2023/02/20 06:38:50 UTC

[GitHub] [druid] abhishekagarwal87 opened a new pull request, #13825: Fix infinite checkpointing between tasks and overlord

abhishekagarwal87 opened a new pull request, #13825:
URL: https://github.com/apache/druid/pull/13825

   If the intermediate handoff period is less than the task duration and there is no new data in the input topic, task will continuously checkpoint the same offsets again and again. This PR fixes that bug by resetting the checkpoint time even when the task receives the same end offset request again. 
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #13825: Fix infinite checkpointing between tasks and overlord

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on PR #13825:
URL: https://github.com/apache/druid/pull/13825#issuecomment-1440057175

   Merging since failure is due to code coverage check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 merged pull request #13825: Fix infinite checkpointing between tasks and overlord

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 merged PR #13825:
URL: https://github.com/apache/druid/pull/13825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekrb19 commented on a diff in pull request #13825: Fix infinite checkpointing between tasks and overlord

Posted by "abhishekrb19 (via GitHub)" <gi...@apache.org>.
abhishekrb19 commented on code in PR #13825:
URL: https://github.com/apache/druid/pull/13825#discussion_r1113187325


##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java:
##########
@@ -932,6 +932,63 @@ public void testTimeBasedIncrementalHandOff() throws Exception
     );
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointResetWithSameEndOffsets() throws Exception
+  {
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
+    maxRowsPerSegment = Integer.MAX_VALUE;
+    intermediateHandoffPeriod = new Period().withMillis(10);
+
+    // Insert data
+    insertData();
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions = new SeekableStreamStartSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 0L, 1, 0L),
+        ImmutableSet.of()
+    );
+    final SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions = new SeekableStreamEndSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 2L, 1, 0L)
+    );
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            INPUT_FORMAT,
+            null
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+
+    // task will pause for checkpointing
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    long currentNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    final Map<Integer, Long> nextEndOffsets = task.getRunner().getLastSequenceMetadata().getStartOffsets();
+    task.getRunner().setEndOffsets(nextEndOffsets, false);
+    long newNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    Assert.assertTrue(
+        StringUtils.format(
+            "Old checkpoint time: [%s], new checkpoint time: [%s]",

Review Comment:
   `%d` since the timestamps here are longs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org