You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/14 19:37:38 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1102201605


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   While working on another PR I realized in `checkAllUpdatingTaskStates` we only try to pause tasks when the commit interval has elapsed, i.e. when we pause a named topology, its corresponding tasks may only be paused after a while, while when we resume, tasks are resumed immediately.
   
   So I think we should move the `pausing` logic out of the `checkAllUpdatingTaskStates` as well like `resuming`, which would leave `checkAllUpdatingTaskStates` to just become `maybeCheckpointUpdatingTasks`. WDYT?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   I think the actual culprit is like @lucasbru said, `StreamThread.maybeCommit uses TaskManager.allTasks`, while the callee `TaskManager.allTasks` is used in many different places.. For example for IQ, it was used and in that case it should return all tasks, whereas in this caller, `TaskManager.allTasks` should just return all processing tasks when updater is enabled, i.e. only the ones in `TaskRegistry`.
   
   > I am also wondering why Streams also commits restoring tasks.
   
   For that, the rationale is to allow the restoration progress to be recorded as well in case it was paused due to another rebalance (though it's `prepareCommit` would always return empty map, so the `committing` process for those restoring tasks would be reduced to writing checkpoints). But somehow in the middle of history we lost the logic to ever change `commitNeeded` flag for restoring tasks, so they would always not be triggered. Hence a new JIRA is proposed to fix it, and we fixed it in state updater.
   
   I thought about fixing it in a probably better way, which involves more changes: 1) introduce a `TaskManager.allProcessingTasks`, 2) depending on the updater enabled flag, let `maybeCommit` call either this one or `allTasks`. When I thought about that, I admit I was thinking this change is a bit too much, and since tasks in updater the task should not make any side-effects and return `commitNeeded` false this maybe just okay.
   
   But thinking about this a bit more, I think it's indeed worrisome to let `ReadyOnlyTask.commitNeeded` to have any potential side effects if we ever have bugs..



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   Ack. That makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org