You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/07 12:35:07 UTC

[GitHub] [kafka] cadonna opened a new pull request, #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

cadonna opened a new pull request, #12600:
URL: https://github.com/apache/kafka/pull/12600

   In the first attempt to handle revoked tasks in the state updater we removed the revoked tasks from the state updater and added it to the set of pending tasks to close cleanly. This is not correct since a revoked task that is immediately reassigned to the same stream thread would neither be re-added to the state updater nor be created again. Also a revoked active task might be added to more than one bookkeeping set in the tasks registry since it might still be returned from stateUpdater.getTasks() after it was removed from the state updater. The reason is that the removal from the state updater is done asynchronously.
   
   This PR solves this issue by introducing a new bookkeeping set in the tasks registry to bookkeep revoked active tasks (actually suspended active tasks).
   
   Additionally this PR closes some testing holes around the modified code.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966987932


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }
+    }
+
+    private void moveTaskFromTasksRegistryToStateUpdater(final Task task) {
+        tasks.removeTask(task);
+        stateUpdater.add(task);
+    }
+
+    private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                           final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) {
         for (final Task task : stateUpdater.getTasks()) {
             final TaskId taskId = task.id();
-            final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
+            tasks.removePendingActiveTaskToSuspend(taskId);

Review Comment:
   We try to remove the task in the state updater from the set of pending tasks to suspend in any case. A revoked active task will be handled in one of the following ways:
   - reassigned as active with unmodified input partitions: task will stay in the state updater
   - reassigned as active with modified input partitions: task will be removed from the state updater and added to the set of pending tasks that need updating input partitions
   - reassigned as standby: task is removed from the state updater and added to the pending tasks to recycle.
   - not reassigned: task is removed from the state updater and added to the pending tasks to close cleanly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r964788770


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();
+        if (task.state() == State.RESTORING) {
+            handleReAssignedRevokedActiveTask(task);
+        }
+    }
+
+    private void handleReAssignedRevokedActiveTask(final Task task) {
+        tasks.removeTask(task);
+        stateUpdater.add(task);
+    }
+
     private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
                                                final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        classifyRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
         for (final Task task : stateUpdater.getTasks()) {
             final TaskId taskId = task.id();
-            final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
             if (activeTasksToCreate.containsKey(taskId)) {
+                final Set<TopicPartition> inputPartitions = activeTasksToCreate.get(taskId);
                 if (task.isActive()) {
-                    if (!task.inputPartitions().equals(topicPartitions)) {
-                        stateUpdater.remove(taskId);
-                        tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
+                    if (tasks.removePendingActiveTaskToSuspend(taskId)) {
+                        prepareRevokedActiveTaskForResuming(task, inputPartitions);
+                    } else {
+                        prepareReAssignedActiveTaskInRestorationForReUse(task, inputPartitions);
                     }
                 } else {
-                    stateUpdater.remove(taskId);
-                    tasks.addPendingTaskToRecycle(taskId, topicPartitions);
+                    prepareRunningStandbyTaskForRecycling(taskId, inputPartitions);
                 }
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
+                final Set<TopicPartition> topicPartitions = standbyTasksToCreate.get(taskId);
                 if (!task.isActive()) {
-                    if (!task.inputPartitions().equals(topicPartitions)) {
-                        stateUpdater.remove(taskId);
-                        tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
-                    }
+                    prepareReAssignedStandbyTaskForRunning(task, topicPartitions);
                 } else {
-                    stateUpdater.remove(taskId);
-                    tasks.addPendingTaskToRecycle(taskId, topicPartitions);
+                    prepareRevokedActiveTaskForRecycling(taskId, topicPartitions);
                 }
                 standbyTasksToCreate.remove(taskId);
             } else {
-                stateUpdater.remove(taskId);
-                tasks.addPendingTaskToCloseClean(taskId);
+                prepareUnusedTaskInStateUpdaterForCleanClose(taskId);
             }
         }

Review Comment:
   This is the part that needs most attention while reviewing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r964791407


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -250,32 +250,502 @@ public void shouldClassifyExistingTasksWithoutStateUpdater() {
     }
 
     @Test
-    public void shouldClassifyExistingTasksWithStateUpdater() {
-        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
-        final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, mkSet(t2p2)).build();
-        final StandbyTask standbyTaskToClose = standbyTask(taskId04, mkSet(t2p0)).build();
-        final StreamTask restoringActiveTaskToRecycle = statefulTask(taskId03, mkSet(t1p3)).build();
-        final StreamTask restoringActiveTaskToClose = statefulTask(taskId01, mkSet(t1p1)).build();
-        final Map<TaskId, Set<TopicPartition>> standbyTasks =
-            mkMap(mkEntry(standbyTaskToRecycle.id(), standbyTaskToRecycle.changelogPartitions()));
-        final Map<TaskId, Set<TopicPartition>> restoringActiveTasks = mkMap(
-            mkEntry(restoringActiveTaskToRecycle.id(), restoringActiveTaskToRecycle.changelogPartitions())
+    public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {

Review Comment:
   We had a lot of holes in testing. I hope I closed them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r964789596


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -786,17 +833,20 @@ private void handleRemovedTasksFromStateUpdater() {
         for (final Task task : stateUpdater.drainRemovedTasks()) {
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
+                recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
                 tasksToCloseDirty.add(task);
             } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
                 stateUpdater.add(task);
+            } else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+                task.suspend();
+                tasks.addTask(task);

Review Comment:
   Here we handle revoked active tasks that were removed from the state updater



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r964790734


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java:
##########
@@ -55,6 +55,10 @@ public interface TasksRegistry {
 
     void addPendingTaskToInit(final Collection<Task> tasks);
 
+    boolean removePendingActiveTaskToSuspend(final TaskId taskId);
+
+    void addPendingActiveTaskToSuspend(final TaskId taskId);
+

Review Comment:
   new bookkeeping APIs for suspended active tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970637424


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -103,42 +101,75 @@ public void addPendingStandbyTasksToCreate(final Map<TaskId, Set<TopicPartition>
 
     @Override
     public Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId) {
-        return pendingTasksToRecycle.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.RECYCLE)) {
+            return pendingUpdateActions.remove(taskId).getInputPartitions();
+        }
+        return null;
     }
 
     @Override
     public void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
-        pendingTasksToRecycle.put(taskId, inputPartitions);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions));
     }
 
     @Override
     public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
-        return pendingTasksToUpdateInputPartitions.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
+            return pendingUpdateActions.remove(taskId).getInputPartitions();
+        }
+        return null;
     }
 
     @Override
     public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
-        pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions));
     }
 
     @Override
     public boolean removePendingTaskToCloseDirty(final TaskId taskId) {
-        return pendingTasksToCloseDirty.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) {
+            pendingUpdateActions.remove(taskId);
+            return true;
+        }
+        return false;
     }
 
     @Override
     public void addPendingTaskToCloseDirty(final TaskId taskId) {
-        pendingTasksToCloseDirty.add(taskId);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseDirty());
     }
 
     @Override
     public boolean removePendingTaskToCloseClean(final TaskId taskId) {
-        return pendingTasksToCloseClean.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) {
+            pendingUpdateActions.remove(taskId);
+            return true;
+        }
+        return false;
     }
 
     @Override
     public void addPendingTaskToCloseClean(final TaskId taskId) {
-        pendingTasksToCloseClean.add(taskId);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseClean());
+    }
+
+    @Override
+    public boolean removePendingActiveTaskToSuspend(final TaskId taskId) {
+        if (containsTaskIdWithAction(taskId, Action.SUSPEND)) {
+            pendingUpdateActions.remove(taskId);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void addPendingActiveTaskToSuspend(final TaskId taskId) {
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend());
+    }
+
+    private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) {
+        final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId);
+        return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action);

Review Comment:
   Done in https://github.com/apache/kafka/pull/12638



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970668525


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }

Review Comment:
   Could you elaborate why a suspended task cannot be reassigned as active with the cooperative assignor? Is this guaranteed?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966771190


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();

Review Comment:
   Method ` handleReAssignedActiveTask()` is called in `classifyRunningAndSuspendedTasks()`. Method `classifyRunningAndSuspendedTasks()` classifies the tasks that are `RUNNING` or `SUSPENDED`. In other words, the tasks that are in the tasks registry. Those tasks are not in state `RESTORING` and so the `if`-branch in your proposal would never be executed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966274214


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();

Review Comment:
   I'm not sure I follow, but in anyways after thinking about it again I think we do not need the pending-tasks-suspended as we could ignore those restoring active tasks at the `handleRevocation` phase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966978149


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }

Review Comment:
   We only resume suspended tasks. If a task is resumed it is removed from the tasks registry and added to the state updater. The state of the task is `RESTORING` after resuming. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1246332926

   Build failures are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / kafka.test.ClusterTestExtensionsTest.[1] Type=ZK, Name=Generated Test, MetadataVersion=3.3-IV3, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12600:
URL: https://github.com/apache/kafka/pull/12600


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1241058075

   > I am not sure I can follow. Are you proposing to not recycle or resume revoked active tasks?
   
   What I'm proposing is that, for restoring active tasks, we can actually ignore them at the `handleRevocation` phase, but only handle them at the `handleAssignment` phase, where we would then know if the task is still owned, or should be closed, or should be recycled. At that time we add them to the corresponding pending tasks, and then call `stateUpdater.remove`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966755655


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   A revoked active tasks could be immediately reassigned (i.e. in the very next assignment) as an active task or a standby task to the stream thread. If we closed the revoked active task, we would need to re-create the reassigned active or standby task. If we suspend the revoked active task, we can resume it if it is immediately reassigned as an active task and we can recycle it if it is immediately reassigned as a standby task.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966762475


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());
                         stateUpdater.remove(restoringTask.id());
                         remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());

Review Comment:
   No, an input partition can only be assigned to one consumer in a consumer group, because Kafka only allows to commit one offset per input partition. If an input partition were shared by two consumers would commit two potentially different offsets for the same input partition. So one consumer would overwrite the offset of the other. In case of fail-over, one consumer would read an offset that it did not commit before the fail-over and  restart processing at the wrong offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970078065


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -103,42 +101,75 @@ public void addPendingStandbyTasksToCreate(final Map<TaskId, Set<TopicPartition>
 
     @Override
     public Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId) {
-        return pendingTasksToRecycle.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.RECYCLE)) {
+            return pendingUpdateActions.remove(taskId).getInputPartitions();
+        }
+        return null;
     }
 
     @Override
     public void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
-        pendingTasksToRecycle.put(taskId, inputPartitions);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions));
     }
 
     @Override
     public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
-        return pendingTasksToUpdateInputPartitions.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
+            return pendingUpdateActions.remove(taskId).getInputPartitions();
+        }
+        return null;
     }
 
     @Override
     public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
-        pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions));
     }
 
     @Override
     public boolean removePendingTaskToCloseDirty(final TaskId taskId) {
-        return pendingTasksToCloseDirty.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) {
+            pendingUpdateActions.remove(taskId);
+            return true;
+        }
+        return false;
     }
 
     @Override
     public void addPendingTaskToCloseDirty(final TaskId taskId) {
-        pendingTasksToCloseDirty.add(taskId);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseDirty());
     }
 
     @Override
     public boolean removePendingTaskToCloseClean(final TaskId taskId) {
-        return pendingTasksToCloseClean.remove(taskId);
+        if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) {
+            pendingUpdateActions.remove(taskId);
+            return true;
+        }
+        return false;
     }
 
     @Override
     public void addPendingTaskToCloseClean(final TaskId taskId) {
-        pendingTasksToCloseClean.add(taskId);
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseClean());
+    }
+
+    @Override
+    public boolean removePendingActiveTaskToSuspend(final TaskId taskId) {
+        if (containsTaskIdWithAction(taskId, Action.SUSPEND)) {
+            pendingUpdateActions.remove(taskId);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void addPendingActiveTaskToSuspend(final TaskId taskId) {
+        pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend());
+    }
+
+    private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) {
+        final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId);
+        return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action);

Review Comment:
   nit: why not put the `!` inside as `pendingUpdateAction != null && pendingUpdateAction.getAction() == action`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }

Review Comment:
   This is not a suggestion: actually if an active tasks is first suspended in `revocation` and then re-assigned in `assignment`, we do not need to put it back to state updater before transit to `running` since it should have nothing to restore at all. The state transition of `running -> suspended -> restoring -> running` seems an overkill, since if a suspended task is resumed, it should be able to run immediately.
   
   On the other hand, since Streams use cooperative assignor now all suspended tasks should be closed/recycled anyways, but not re-assigned (only in eager assignor could we be reassigning a revoked partition inefficiently). And for now we still keep a `suspended` state for recycling. But suppose in the future, we have a function that is on-par with closeClean/Dirty, like recycle directly which saves the state managers, I think we can remove the whole `suspend` state as well and hence this logic can be further simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r968443807


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   I introduced one map for all pending sets in `Tasks`. Now a task can only be part of the latest pending set it was assigned to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1240827367

   > we reduce removeRevokedTasksFromStateUpdater to only record in the pending tasks to suspend, but not try to remove from state updaters. And in handleAssignment we just update the tasks bookkeeping from suspended to closed in addition calling remove from the state updater. 
   
   I am not sure I can follow. Are you proposing to not recycle or resume revoked active tasks?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r965876179


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();

Review Comment:
   Although I like the more explicit nature of your proposal, I do not think that it is correct. If a task is in `SUSPENDED` the task transits to `RESTORING` only after the call to `task.resume()`. Reassigned revoked active tasks should be in `SUSPENDED` and not in `RESTORING`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966750330


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();
+        if (task.state() == State.RESTORING) {
+            handleReAssignedRevokedActiveTask(task);
+        }
+    }
+
+    private void handleReAssignedRevokedActiveTask(final Task task) {
+        tasks.removeTask(task);
+        stateUpdater.add(task);
+    }
+
     private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
                                                final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        classifyRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);

Review Comment:
   That is correct! Having the standby task on the same stream thread or even the same Streams client would defeat the purpose of the standby task since when the Streams clients fails also both -- active and standby -- might lose their state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r967380695


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -250,32 +250,502 @@ public void shouldClassifyExistingTasksWithoutStateUpdater() {
     }
 
     @Test
-    public void shouldClassifyExistingTasksWithStateUpdater() {
-        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
-        final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, mkSet(t2p2)).build();
-        final StandbyTask standbyTaskToClose = standbyTask(taskId04, mkSet(t2p0)).build();
-        final StreamTask restoringActiveTaskToRecycle = statefulTask(taskId03, mkSet(t1p3)).build();
-        final StreamTask restoringActiveTaskToClose = statefulTask(taskId01, mkSet(t1p1)).build();
-        final Map<TaskId, Set<TopicPartition>> standbyTasks =
-            mkMap(mkEntry(standbyTaskToRecycle.id(), standbyTaskToRecycle.changelogPartitions()));
-        final Map<TaskId, Set<TopicPartition>> restoringActiveTasks = mkMap(
-            mkEntry(restoringActiveTaskToRecycle.id(), restoringActiveTaskToRecycle.changelogPartitions())
+    public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {

Review Comment:
   I made a brief pass on the testings, did not spot any obvious issues. Great job on improving the coverage!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   I think we should remove the task from other pending sets if it was in them. Since in handling removed / restored tasks from state updater checking the other pending sets (https://github.com/apache/kafka/pull/12600/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR804) has a higher order, right?
   
   I was thinking about the following steps:
   
   T1: a first rebalance completes where a task was recycled to active, it was added to pending-recycle, and call to `stateUpdater.remove`.
   T2: a new rebalance starts, where we `handleRevocation`, and then this task is revoked, we add it to the pending-suspend sets as well
   T3: finally the task is returned from state updater, we would check the pending-recycle first and trigger that logic, which would be wrong.
   
   More generally, I think as a principle, a task should only be in one of the pending sets in Tasks. So whenever we are adding a a task to a pending set, we should make sure they are not in other pending sets.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final long now,
         for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
+                recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
                 tasksToCloseDirty.add(task);
             } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
                 transitRestoredTaskToRunning(task, now, offsetResetter);
+            } else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+                task.suspend();
+                tasks.addTask(task);

Review Comment:
   Ah right, that makes sense. We'd still need the pending suspend tasks then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970973727


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }

Review Comment:
   Yes, for within a single rebalance: with cooperative, the revocation and assignment happens at the same time, i.e. at the end of the rebalance, instead of revocation happening at the beginning and the assignment happens at the end, so for a revoked partition (hence task) we know it's definitely going to be reassigned for cooperative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r964790082


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final long now,
         for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
+                recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
                 tasksToCloseDirty.add(task);
             } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
                 transitRestoredTaskToRunning(task, now, offsetResetter);
+            } else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+                task.suspend();
+                tasks.addTask(task);

Review Comment:
   Here we handle revoked active tasks that have been already restored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966762475


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());
                         stateUpdater.remove(restoringTask.id());
                         remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());

Review Comment:
   No, an input partition can only be assigned to one consumer in a consumer group, because Kafka only allows to commit one offset per input partition. If an input partition were shared, two consumers would commit two potentially different offsets for the same input partition. So one consumer would overwrite the offset of the other. In case of fail-over, one consumer would read an offset that it did not commit before the fail-over and  restart processing at the wrong offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1239332591

   Call for review: @wcarlson5 @lihaosky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966773511


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();

Review Comment:
   Regarding the need of bookkeeping suspended tasks, I thought it trough and I think we still need it for revoked active tasks in the state updater that have been already restored or that will be restored between the call for `handleRevocation()` and `handleAssignment`. I will updater this PR and we can then further discuss this matter.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r965355508


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();

Review Comment:
   nit: maybe we can distinguish the cases more explicitly, as:
   
   ````
       if (task.state() == State.RESTORING) {
               handleReAssignedRevokedActiveTask(task);
       } else if (task.state() == State.SUSPENDED) {
               task.resume();
       } else  if (task.state() == State.RUNNING) {
               // do nothing, this is the case that tasks were not revoked before
       } else {
               throw new IllegalStateException("other states should not happen");
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r968207039


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   Yeah, I had similar thoughts about that a task must only be in one of the bookkeeping sets. I even thought about a dedicated data structure for that. Let me create a PR for it.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r968441157


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,96 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                             final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                             final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                             final Set<Task> tasksToCloseClean) {
+        handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
-    private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                               final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                               final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                               final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        if (task.state() == State.SUSPENDED) {
+            task.resume();
+            moveTaskFromTasksRegistryToStateUpdater(task);
+        }
+    }
+
+    private void moveTaskFromTasksRegistryToStateUpdater(final Task task) {
+        tasks.removeTask(task);
+        stateUpdater.add(task);
+    }
+
+    private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                           final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) {
         for (final Task task : stateUpdater.getTasks()) {
             final TaskId taskId = task.id();
-            final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
             if (activeTasksToCreate.containsKey(taskId)) {
+                final Set<TopicPartition> inputPartitions = activeTasksToCreate.get(taskId);
                 if (task.isActive()) {
-                    if (!task.inputPartitions().equals(topicPartitions)) {
-                        stateUpdater.remove(taskId);
-                        tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
-                    }
+                    updateInputPartitionsOrRemoveTaskFromTasksToSuspend(task, inputPartitions);
                 } else {
-                    stateUpdater.remove(taskId);
-                    tasks.addPendingTaskToRecycle(taskId, topicPartitions);
+                    removeTaskToRecycleFromStateUpdater(taskId, inputPartitions);
                 }
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    if (!task.inputPartitions().equals(topicPartitions)) {
-                        stateUpdater.remove(taskId);
-                        tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
-                    }
-                } else {
-                    stateUpdater.remove(taskId);
-                    tasks.addPendingTaskToRecycle(taskId, topicPartitions);
+                if (task.isActive()) {
+                    removeTaskToRecycleFromStateUpdater(taskId, standbyTasksToCreate.get(taskId));
                 }
                 standbyTasksToCreate.remove(taskId);
             } else {
-                stateUpdater.remove(taskId);
-                tasks.addPendingTaskToCloseClean(taskId);
+                removeUnusedTaskFromStateUpdater(taskId);
             }
         }
     }
 
+    private void updateInputPartitionsOrRemoveTaskFromTasksToSuspend(final Task task,
+                                                                     final Set<TopicPartition> inputPartitions) {
+        final TaskId taskId = task.id();
+        if (!task.inputPartitions().equals(inputPartitions)) {
+            stateUpdater.remove(taskId);
+            tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions);
+        } else {
+            tasks.removePendingActiveTaskToSuspend(taskId);

Review Comment:
   We only remove a task from the task to suspend if the task is re-assigned as active. For all other cases the task is added to a different set of a pending update action (e.g. update input partitions) which removes it from the set of tasks to suspend automatically since a task can only be a member of at most one set of pending update actions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r968378813


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -52,11 +53,8 @@ class Tasks implements TasksRegistry {
     // we receive a new assignment and they are revoked from the thread.
     private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>();
-    private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>();
-    private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
-    private final Set<TaskId> pendingTasksToCloseClean = new HashSet<>();
-    private final Set<TaskId> pendingTasksToCloseDirty = new HashSet<>();
+    private final Map<TaskId, PendingUpdateAction> pendingUpdateActions = new HashMap<>();

Review Comment:
   This map ensures that each task has at most one pending update action. The last assigned pending update action is stored in the map. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966520188


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   noob question: Why do we suspend instead of close?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();
+        if (task.state() == State.RESTORING) {
+            handleReAssignedRevokedActiveTask(task);
+        }
+    }
+
+    private void handleReAssignedRevokedActiveTask(final Task task) {
+        tasks.removeTask(task);
+        stateUpdater.add(task);
+    }
+
     private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
                                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
                                                final Map<Task, Set<TopicPartition>> tasksToRecycle,
                                                final Set<Task> tasksToCloseClean) {
-        classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+        classifyRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);

Review Comment:
   noob question: are taskIds in `activeTasksToCreate` and `standbyTasksToCreate` always mutually exclusive? I guess standby is always disabled if there's only 1 host/node? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        tasks.addPendingActiveTaskToSuspend(restoringTask.id());
                         stateUpdater.remove(restoringTask.id());
                         remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());

Review Comment:
   Is it possible that task A and B sharing inputPartitions? If so, removeAll won't revoke both A and B?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966773511


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti
         }
     }
 
-    private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
-                                      final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
-                                      final Map<Task, Set<TopicPartition>> tasksToRecycle,
-                                      final Set<Task> tasksToCloseClean) {
+    private void classifyRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                                  final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                                  final Map<Task, Set<TopicPartition>> tasksToRecycle,
+                                                  final Set<Task> tasksToCloseClean) {
         for (final Task task : tasks.allTasks()) {
+            if (!task.isActive()) {
+                throw new IllegalStateException("Standby tasks should only be managed by the state updater");
+            }
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                }
+                handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId));
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    throw new IllegalStateException("Standby tasks should only be managed by the state updater");
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
+                tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
     }
 
+    private void handleReAssignedActiveTask(final Task task,
+                                            final Set<TopicPartition> inputPartitions) {
+        if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+        task.resume();

Review Comment:
   Regarding the need of bookkeeping suspended tasks, I thought it trough and I think we still need it for revoked active tasks in the state updater that have been already restored or that will be restored between the calls to `handleRevocation()` and `handleAssignment()`. I will updater this PR and we can then further discuss this matter.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966991995


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final long now,
         for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
+                recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
                 tasksToCloseDirty.add(task);
             } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
                 transitRestoredTaskToRunning(task, now, offsetResetter);
+            } else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+                task.suspend();
+                tasks.addTask(task);

Review Comment:
   We still need the set of pending active tasks to suspend because an iteration of the poll loop might happen between the calls to `handleRevocation()` and `handleAssignment()`. During that iteration a revoked active task might be removed from the state updater because it is restored and processed which would violate the assumption that revoked tasks are not processed (i.e., commit offsets) anymore.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org