You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/23 04:23:06 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r978255189


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Hi @cadonna I tried out this idea but it turns out more complicated than expected. The main issue is that if we store the paused topology outside the restore thread, and only let the caller thread to modify it, the restore thread still need to access it while handling add-task; and vice versa if we store the paused topologies inside the restore thread and let that thread to modify it upon handling pause/resume-task, the caller thread still need to access it to determine if a new action needs to be enqueued. In either case we'd construct two communication channels between the two.
   
   In the end I chose a different route, which is directly pass in the `topologyMetadata` to StateUpdater, and then periodically check that topology's paused metadata to pause / resume tasks. As a result we would also remove pause/resume-task actions as a queue item. Right now I just piggy-backed it with checkpointing but I'm okay with doing it separately, periodically. After some pondering I feel this is actually simpler than passing pause/resume actions via channels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org