You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/28 04:46:18 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12562: KAFKA-10199: Remove tasks from state updater on shutdown

guozhangwang commented on code in PR #12562:
URL: https://github.com/apache/kafka/pull/12562#discussion_r956661497


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
         verify(changelogReader, times(2)).clear();
     }
 
+    @Test
+    public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));

Review Comment:
   Why we call `shutdown` first? We should have shutdown upon tearing down each test.
   
   Ditto in `shouldRemoveUpdatingTasksOnShutdown`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -100,7 +105,8 @@ public void run() {
             } catch (final RuntimeException anyOtherException) {
                 handleRuntimeException(anyOtherException);
             } finally {
-                clear();
+                removeAddedTasksFromInputQueue();
+                removeUpdatingAndPausedTasks();

Review Comment:
   We no longer clear the `restoredActiveTasks`, is that intentional?
   
   Do we assume by the time the thread is shutting down, the stream thread would not care about any restored tasks any longer, or are you going to merge the restored tasks with removed tasks soon anyways?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
         verify(changelogReader, times(2)).clear();
     }
 
+    @Test
+    public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
+        final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        stateUpdater.pause(TASK_0_1);
+        stateUpdater.add(statelessTask);
+        stateUpdater.add(statefulTask);
+        stateUpdater.remove(TASK_1_1);
+        stateUpdater.add(standbyTask);
+        stateUpdater.resume(TASK_0_1);
+        verifyRemovedTasks();
+
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+
+        verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
+    }
+
+    @Test
+    public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        stateUpdater = new DefaultStateUpdater(new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
+        final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(activeTask);
+        stateUpdater.add(standbyTask);
+        verifyUpdatingTasks(activeTask, standbyTask);
+        verifyRemovedTasks();
+
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+
+        verifyRemovedTasks(activeTask, standbyTask);
+        verify(activeTask).maybeCheckpoint(true);
+        verify(standbyTask).maybeCheckpoint(true);
+    }
+
+    @Test
+    public void shouldRemovePausedTasksOnShutdown() throws Exception {

Review Comment:
   It's possible that the state updater thread was not started when shutdown was called, in that case we do not need to do the latter since it's always empty still.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org