You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/16 15:21:51 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12279: KAFKA-10199: Commit the restoration progress within StateUpdater

cadonna commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r899151497


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) {
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCommitRestoringTasks(final long now) {
+            final long elapsedMsSinceLastCommit = now - lastCommitMs;
+            if (elapsedMsSinceLastCommit > commitIntervalMs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)",
+                        elapsedMsSinceLastCommit, commitIntervalMs);
+                }
+
+                for (final Task task : updatingTasks.values()) {
+                    // do not enforce checkpointing during restoration if its position has not advanced much

Review Comment:
   Is this a ToDo?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) {
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCommitRestoringTasks(final long now) {
+            final long elapsedMsSinceLastCommit = now - lastCommitMs;
+            if (elapsedMsSinceLastCommit > commitIntervalMs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)",
+                        elapsedMsSinceLastCommit, commitIntervalMs);
+                }
+
+                for (final Task task : updatingTasks.values()) {
+                    // do not enforce checkpointing during restoration if its position has not advanced much
+                    commitTask(task, false);
+                }
+
+                lastCommitMs = now;
+            }
+        }
+
+        private void commitTask(final Task task, final boolean enforceCheckpoint) {
+            // prepare commit should not take any effect except a no-op verification
+            final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
+            if (!offsetAndMetadata.isEmpty()) {
+                throw new IllegalStateException("Task " + task.id() + " should not have any source offset " +
+                        "committable during restoration, but have " + offsetAndMetadata + " instead. " + BUG_ERROR_MESSAGE);
+            }
+

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -69,15 +77,27 @@ class DefaultStateUpdaterTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
     private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
+    private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
     private final ChangelogReader changelogReader = mock(ChangelogReader.class);
     private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
-    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM);
+    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
 
     @AfterEach
     public void tearDown() {
         stateUpdater.shutdown(Duration.ofMinutes(1));
     }
 
+    private Properties configProps(final int commitInterval) {
+        return mkObjectProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueClassTestName(getClass())),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+                mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval),
+                // we need to make sure that transaction timeout is not lower than commit interval for EOS

Review Comment:
   Is this also a ToDo?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +638,77 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCommitTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception {
+        final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNotCommitTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCommitTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception {
+        waitForCondition(
+                () -> {
+                    for (final Task task : tasks) {
+                        verify(task, atLeast(1)).prepareCommit();
+                        verify(task, atLeast(1)).postCommit(enforceCheckpoint);
+                    }
+
+                    return true;
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not auto commit all tasks within the given timeout!"
+        );

Review Comment:
   Did you consider to use `verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1))` instead of `waitForCondition()`? 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +638,77 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCommitTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception {
+        final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNotCommitTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCommitTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception {
+        waitForCondition(
+                () -> {
+                    for (final Task task : tasks) {
+                        verify(task, atLeast(1)).prepareCommit();
+                        verify(task, atLeast(1)).postCommit(enforceCheckpoint);
+                    }
+
+                    return true;
+                },
+                VERIFICATION_TIMEOUT,

Review Comment:
   Could you express the timeout in terms of the commit interval? 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -69,15 +77,27 @@ class DefaultStateUpdaterTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
     private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
+    private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
     private final ChangelogReader changelogReader = mock(ChangelogReader.class);
     private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
-    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM);
+    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
 
     @AfterEach
     public void tearDown() {
         stateUpdater.shutdown(Duration.ofMinutes(1));
     }
 
+    private Properties configProps(final int commitInterval) {
+        return mkObjectProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueClassTestName(getClass())),

Review Comment:
   Why do we need this since we do not use an embedded Kafka cluster?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +638,77 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCommitTasksOnInterval() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCommitTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws Exception {
+        final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
+        final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNotCommitTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCommitTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception {
+        waitForCondition(
+                () -> {
+                    for (final Task task : tasks) {
+                        verify(task, atLeast(1)).prepareCommit();
+                        verify(task, atLeast(1)).postCommit(enforceCheckpoint);
+                    }
+
+                    return true;
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not auto commit all tasks within the given timeout!"
+        );
+    }
+
+    private void verifyNotCommitTasks(final Task... tasks) throws Exception {
+        for (final Task task : tasks) {
+            verify(task, times(0)).prepareCommit();
+            verify(task, times(0)).postCommit(true);
+            verify(task, times(0)).postCommit(false);

Review Comment:
   ```suggestion
               verify(task, never()).prepareCommit();
               verify(task, never()).postCommit(true);
               verify(task, never()).postCommit(false);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) {
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCommitRestoringTasks(final long now) {
+            final long elapsedMsSinceLastCommit = now - lastCommitMs;
+            if (elapsedMsSinceLastCommit > commitIntervalMs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)",
+                        elapsedMsSinceLastCommit, commitIntervalMs);
+                }
+
+                for (final Task task : updatingTasks.values()) {
+                    // do not enforce checkpointing during restoration if its position has not advanced much
+                    commitTask(task, false);
+                }
+
+                lastCommitMs = now;
+            }
+        }
+
+        private void commitTask(final Task task, final boolean enforceCheckpoint) {
+            // prepare commit should not take any effect except a no-op verification
+            final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();

Review Comment:
   I am not sure I understand why we should commit. The task does not read from the input at this point. Wouldn't flushing the stores and writing the checkpoint file be enough? Can we somehow just flush the state store and write the checkpoint instead of calling the `*Commit()` methods? I think that would simplify the code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -286,6 +308,7 @@ public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Excep
         stateUpdater.add(task4);
 
         verifyRestoredActiveTasks(task2, task1);
+        verifyCommitTasks(true, task2, task1);

Review Comment:
   Shouldn't the offsets of standby tasks also be written to the checkpoint file? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org