You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/18 22:04:18 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12386: KAFKA-10199: Add PAUSE in state updater

guozhangwang commented on code in PR #12386:
URL: https://github.com/apache/kafka/pull/12386#discussion_r923898461


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -267,8 +276,31 @@ private void removeTask(final TaskId taskId) {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
+            }
+        }
+
+        private void pauseTask(final TaskId taskId) {
+            final Task task = updatingTasks.get(taskId);
+            if (task != null) {
+                // do not need to unregister changelog partitions for paused tasks

Review Comment:
   This comment is to remind ourselves in later PRs When shall we call changelogReader#unregister and register. So far the callers of these functions are not symmetric and I suspect we need to revisit this as we go towards the end of the integration.
   
   I will remove this comment, or replace them with more meaningful ones as later we refactor further and revisit on the timing to register/deregister partitions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org