You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/07 01:29:47 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12386: KAFKA-10199: Add PAUSE in task updater

guozhangwang opened a new pull request, #12386:
URL: https://github.com/apache/kafka/pull/12386

   1. Add pause action to task-updater. 
   2. When removing a task, also check in the paused tasks in addition to removed tasks.
   3. Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12386:
URL: https://github.com/apache/kafka/pull/12386


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923916317


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() throws Exception {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923506717


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.pause(task.id());
+        stateUpdater.pause(controlTask.id());
+
+        verifyPausedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromFailedTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotPauseStandbyTaskFromFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskFromFailedTasks(task);
+    }
+
+    private void shouldNotPauseTaskFromFailedTasks(final Task task) throws Exception {
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new StreamsException("Something happened", task.id());
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        final Map<TaskId, Task> updatingTasks = mkMap(
+                mkEntry(task.id(), task),
+                mkEntry(controlTask.id(), controlTask)
+        );
+        doThrow(streamsException)
+                .doNothing()
+                .when(changelogReader).restore(updatingTasks);

Review Comment:
   ```suggestion
               .doNothing()
               .when(changelogReader).restore(updatingTasks);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923900844


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {
+        if (tasks.length == 0) {
+            assertTrue(stateUpdater.getPausedTasks().isEmpty());
+        } else {
+            final Set<Task> expectedPausedTasks = mkSet(tasks);
+            final Set<Task> pausedTasks = new HashSet<>();
+            waitForCondition(
+                    () -> {
+                        pausedTasks.addAll(stateUpdater.getPausedTasks());
+                        return pausedTasks.containsAll(expectedPausedTasks)
+                                && pausedTasks.size() == expectedPausedTasks.size();
+                    },
+                    VERIFICATION_TIMEOUT,
+                    "Did not get all paused task within the given timeout!"
+            );
+            assertTrue(pausedTasks.stream()
+                .allMatch(task -> task.isActive() && task.state() == State.RESTORING
+                    || !task.isActive() && task.state() == State.RUNNING));

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923908100


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -143,6 +142,27 @@ public void shouldThrowIfStandbyTaskNotInStateRunning() {
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
     private void shouldThrowIfTaskNotInGivenState(final Task task, final State correctState) {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923506009


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {
+        if (tasks.length == 0) {
+            assertTrue(stateUpdater.getPausedTasks().isEmpty());
+        } else {
+            final Set<Task> expectedPausedTasks = mkSet(tasks);
+            final Set<Task> pausedTasks = new HashSet<>();
+            waitForCondition(
+                    () -> {
+                        pausedTasks.addAll(stateUpdater.getPausedTasks());
+                        return pausedTasks.containsAll(expectedPausedTasks)
+                                && pausedTasks.size() == expectedPausedTasks.size();
+                    },
+                    VERIFICATION_TIMEOUT,
+                    "Did not get all paused task within the given timeout!"
+            );

Review Comment:
   ```suggestion
               waitForCondition(
                   () -> {
                       pausedTasks.addAll(stateUpdater.getPausedTasks());
                       return pausedTasks.containsAll(expectedPausedTasks)
                               && pausedTasks.size() == expectedPausedTasks.size();
                   },
                   VERIFICATION_TIMEOUT,
                   "Did not get all paused task within the given timeout!"
               );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923506434


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.pause(task.id());
+        stateUpdater.pause(controlTask.id());
+
+        verifyPausedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromFailedTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotPauseStandbyTaskFromFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskFromFailedTasks(task);
+    }
+
+    private void shouldNotPauseTaskFromFailedTasks(final Task task) throws Exception {
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new StreamsException("Something happened", task.id());
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        final Map<TaskId, Task> updatingTasks = mkMap(
+                mkEntry(task.id(), task),
+                mkEntry(controlTask.id(), controlTask)

Review Comment:
   ```suggestion
               mkEntry(task.id(), task),
               mkEntry(controlTask.id(), controlTask)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923898981


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java:
##########
@@ -47,6 +48,11 @@ public static TaskAndAction createRemoveTask(final TaskId taskId) {
         return new TaskAndAction(null, taskId, Action.REMOVE);
     }
 
+    public static TaskAndAction createPauseTask(final TaskId taskId) {
+        Objects.requireNonNull(taskId, "Task ID of task to pause is null!");
+        return new TaskAndAction(null, taskId, Action.PAUSE);
+    }
+

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923903522


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1012,6 +1170,22 @@ private void verifyExceptionsAndFailedTasks(final ExceptionAndTasks... exception
         );
     }
 
+    private void verifyFailedTasks(final Task... tasks) throws Exception {
+        final List<Task> expectedFailedTasks = Arrays.asList(tasks);
+        final Set<Task> failedTasks = new HashSet<>();
+        waitForCondition(
+                () -> {
+                    for (final ExceptionAndTasks exceptionsAndTasks : stateUpdater.getExceptionsAndFailedTasks()) {
+                        failedTasks.addAll(exceptionsAndTasks.getTasks());
+                    }
+                    return failedTasks.containsAll(expectedFailedTasks)
+                            && failedTasks.size() == expectedFailedTasks.size();
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not get all exceptions and failed tasks within the given timeout!"
+        );
+    }
+

Review Comment:
   `verifyExceptionsAndFailedTasks` verifies that the exception object is exactly the same, but here since we do not control the creation of the illegal state, we cannot do the exact match verification.
   
   I augmented this function with the exception class to be differentiated with the other `verifyExceptionsAndFailedTasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12386:
URL: https://github.com/apache/kafka/pull/12386#issuecomment-1188363535

   > Currently, it is not clear to me if we also need to consider the case were a task is paused before it is added to the state updater? Such a situation might happen when a Kafka Streams client is paused before it is started which is legit according to KIP-834.
   
   Yes this is a good question. Currently I'm thinking to not complicate the logic of state updater but handling such cases outside, a.k.a. in task manager, when tasks are not being created we keep the pause status book-kept inside the task managers (potentially in the pending tasks structure as well).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923915840


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -428,6 +456,55 @@ private void shouldRemoveStatefulTask(final Task task) throws Exception {
         verify(changelogReader).unregister(task.changelogPartitions());
     }
 
+    @Test
+    public void shouldPauseActiveStatefulTask() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, never()).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
+
+        stateUpdater.start();
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+
+        stateUpdater.pause(task1.id());
+
+        verifyPausedTasks(task1);
+        verifyCheckpointTasks(true, task1);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks(task2);
+        verifyExceptionsAndFailedTasks();
+        verify(changelogReader, times(1)).enforceRestoreActive();
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    private void shouldPauseStatefulTask(final Task task) throws Exception {
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        stateUpdater.pause(task.id());
+
+        verifyPausedTasks(task);
+        verifyCheckpointTasks(true, task);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923919872


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -94,6 +94,19 @@ public int hashCode() {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Pause a task (active or standby) from restoring in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * The task to be paused is not removed from the restored active tasks and the failed tasks.

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12386:
URL: https://github.com/apache/kafka/pull/12386#issuecomment-1188443250

   @cadonna Thanks for the detailed review! I've incorporated them into this PR and merged it.
   
   Please feel free to leave more comments if you like, and I will rebase and incorporate them in the next https://github.com/apache/kafka/pull/12387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in task updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923282700


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
+            }
+        }
+
+        private void pauseTask(final TaskId taskId) {
+            final Task task = updatingTasks.get(taskId);
+            if (task != null) {
+                // do not need to unregister changelog partitions for paused tasks

Review Comment:
   Do we really need this comment?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java:
##########
@@ -47,6 +48,11 @@ public static TaskAndAction createRemoveTask(final TaskId taskId) {
         return new TaskAndAction(null, taskId, Action.REMOVE);
     }
 
+    public static TaskAndAction createPauseTask(final TaskId taskId) {
+        Objects.requireNonNull(taskId, "Task ID of task to pause is null!");
+        return new TaskAndAction(null, taskId, Action.PAUSE);
+    }
+

Review Comment:
   Could you please add a test in `TaskAndActionTest` for this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java:
##########
@@ -55,7 +61,7 @@ public Task getTask() {
     }
 
     public TaskId getTaskId() {
-        if (action != Action.REMOVE) {
+        if (action != Action.REMOVE && action != Action.PAUSE) {

Review Comment:
   Could you please add a test in `TaskAndActionTest` for this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -94,6 +94,19 @@ public int hashCode() {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Pause a task (active or standby) from restoring in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * The task to be paused is not removed from the restored active tasks and the failed tasks.

Review Comment:
   Could you also add that removed tasks are not paused?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() throws Exception {

Review Comment:
   `shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks` -> `shouldNotPauseActiveStatefulTaskInRestoredActiveTasks`
   
   The same applies to below test methods.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -143,6 +142,27 @@ public void shouldThrowIfStandbyTaskNotInStateRunning() {
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+

Review Comment:
   Just for completeness, could you also add the case where a standby task is added and then an active task with the same ID is added?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
+            }

Review Comment:
   I could not find a test that verifies this branch. Could you please add one if I did not miss it?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {
+        if (tasks.length == 0) {
+            assertTrue(stateUpdater.getPausedTasks().isEmpty());
+        } else {
+            final Set<Task> expectedPausedTasks = mkSet(tasks);
+            final Set<Task> pausedTasks = new HashSet<>();
+            waitForCondition(
+                    () -> {
+                        pausedTasks.addAll(stateUpdater.getPausedTasks());
+                        return pausedTasks.containsAll(expectedPausedTasks)
+                                && pausedTasks.size() == expectedPausedTasks.size();
+                    },
+                    VERIFICATION_TIMEOUT,
+                    "Did not get all paused task within the given timeout!"
+            );
+            assertTrue(pausedTasks.stream()
+                .allMatch(task -> task.isActive() && task.state() == State.RESTORING
+                    || !task.isActive() && task.state() == State.RUNNING));

Review Comment:
   This verification does not contribute to testing correctness since tasks are mocks for which the test code specifies what they should return. I removed this verifications in the other verification methods in one of my last PRs. Could you please also remove this verification here?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1012,6 +1170,22 @@ private void verifyExceptionsAndFailedTasks(final ExceptionAndTasks... exception
         );
     }
 
+    private void verifyFailedTasks(final Task... tasks) throws Exception {
+        final List<Task> expectedFailedTasks = Arrays.asList(tasks);
+        final Set<Task> failedTasks = new HashSet<>();
+        waitForCondition(
+                () -> {
+                    for (final ExceptionAndTasks exceptionsAndTasks : stateUpdater.getExceptionsAndFailedTasks()) {
+                        failedTasks.addAll(exceptionsAndTasks.getTasks());
+                    }
+                    return failedTasks.containsAll(expectedFailedTasks)
+                            && failedTasks.size() == expectedFailedTasks.size();
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not get all exceptions and failed tasks within the given timeout!"
+        );
+    }
+

Review Comment:
   Why do you introduce this verification method? For testing exceptions there is already `verifyExceptionsAndFailedTasks()`. You should anyways also verify the thrown exception and not only the failed tasks because otherwise any exception will satisfy this verification not only the expected one.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -143,6 +142,27 @@ public void shouldThrowIfStandbyTaskNotInStateRunning() {
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
     private void shouldThrowIfTaskNotInGivenState(final Task task, final State correctState) {

Review Comment:
   nit: Could you please move this method to below `shouldThrowIfStandbyTaskNotInStateRunning()`? In this way methods that a related to each other are clustered.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -428,6 +456,55 @@ private void shouldRemoveStatefulTask(final Task task) throws Exception {
         verify(changelogReader).unregister(task.changelogPartitions());
     }
 
+    @Test
+    public void shouldPauseActiveStatefulTask() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, never()).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
+
+        stateUpdater.start();
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+
+        stateUpdater.pause(task1.id());
+
+        verifyPausedTasks(task1);
+        verifyCheckpointTasks(true, task1);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks(task2);
+        verifyExceptionsAndFailedTasks();
+        verify(changelogReader, times(1)).enforceRestoreActive();
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    private void shouldPauseStatefulTask(final Task task) throws Exception {
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        stateUpdater.pause(task.id());
+
+        verifyPausedTasks(task);
+        verifyCheckpointTasks(true, task);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {

Review Comment:
   nit: Could you please move all `shouldNotRemove...` tests before `shouldPauseActiveStatefulTask()` so that all tests related to remove are together and all tests related to pause are together?  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -94,6 +94,19 @@ public int hashCode() {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Pause a task (active or standby) from restoring in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * The task to be paused is not removed from the restored active tasks and the failed tasks.

Review Comment:
   Shouldn't this be:
   ```
   Restored tasks and failed tasks are not paused.
   ```
   
   or 
   
   ```
   Tasks in restored active tasks and failed tasks are not paused.
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {

Review Comment:
   We also need to verify the paused tasks in the existing test methods. For instance, `shouldImmediatelyAddStatelessTasksToRestoredTasks()` should include the call `verifyPausedTasks()` to verify that a stateless task is not added to the paused tasks. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -498,6 +575,66 @@ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exceptio
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.pause(task.id());
+        stateUpdater.pause(controlTask.id());
+
+        verifyPausedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskFromFailedTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotPauseStandbyTaskFromFailedTasks() throws Exception {

Review Comment:
   Could you please add tests `shouldNotPauseActiveStatefulTaskInRemovedTasks()` and `shouldNotPauseStandbyTaskInRemovedTasks()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923914613


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
+            }

Review Comment:
   Sure, added one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923921791


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -520,6 +568,8 @@ private Stream<Task> getStreamOfTasks() {
                         restoredActiveTasks.stream(),
                         Stream.concat(
                             exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> exceptionAndTasks.getTasks().stream()),
-                            removedTasks.stream()))));
+                            Stream.concat(

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923899776


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java:
##########
@@ -55,7 +61,7 @@ public Task getTask() {
     }
 
     public TaskId getTaskId() {
-        if (action != Action.REMOVE) {
+        if (action != Action.REMOVE && action != Action.PAUSE) {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923900551


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -143,6 +142,27 @@ public void shouldThrowIfStandbyTaskNotInStateRunning() {
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923909025


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -983,6 +1120,27 @@ private void verifyRemovedTasks(final Task... tasks) throws Exception {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923463164


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -520,6 +568,8 @@ private Stream<Task> getStreamOfTasks() {
                         restoredActiveTasks.stream(),
                         Stream.concat(
                             exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> exceptionAndTasks.getTasks().stream()),
-                            removedTasks.stream()))));
+                            Stream.concat(

Review Comment:
   Could you please add a unit test `shouldGetTasksFromPausedTasks()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923898461


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
+            }
+        }
+
+        private void pauseTask(final TaskId taskId) {
+            final Task task = updatingTasks.get(taskId);
+            if (task != null) {
+                // do not need to unregister changelog partitions for paused tasks

Review Comment:
   This comment is to remind ourselves in later PRs When shall we call changelogReader#unregister and register. So far the callers of these functions are not symmetric and I suspect we need to revisit this as we go towards the end of the integration.
   
   I will remove this comment, or replace them with more meaningful ones as later we refactor further and revisit on the timing to register/deregister partitions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org