You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2023/03/24 22:12:00 UTC

[jira] [Created] (KAFKA-14847) Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS

Guozhang Wang created KAFKA-14847:
-------------------------------------

             Summary: Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS
                 Key: KAFKA-14847
                 URL: https://issues.apache.org/jira/browse/KAFKA-14847
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


Today, EOS-v2/v1 and ALOS shares the same internal callpath inside TaskManager/TaskExecutor for committing tasks from various scenarios, the call path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> {{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, this is fine to commit just a subset of the tasks; however for EOS-v1, since all tasks participate in the same txn it could lead to dangerous violations, and today we are relying on all the callers of the commit function to make sure that the list of tasks they passed in, under EOS-v2, would still not violate the semantics. As summarized today (thanks to Matthias), today that callee could be triggered in the following cases:

1) Inside handleRevocation() -- this is a clean path, an we add all non-revoked tasks with commitNeeded() flag set to the commit -- so this seems to be fine.
2) tryCloseCleanAllActiveTasks() -- here we only call it, if tasksToCloseDirty.isEmpty() -- so it seems fine, too.
3) commit() with a list of task handed in -- we call commit() inside the TM three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() -- here, we only consider RUNNING and RESTORING tasks, which are not corrupted -- note we only throw a TaskCorruptedException during restore state initialization, thus, corrupted tasks did not process anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() -- under EOS-v2, as we just commit a subset of tasks' source offsets while at the same time we still commit those unsuccessful task's outgoing records if there are any.

Just going through this list of callers itself, as demonstrated above, is already pretty complex, and very vulnerable to bugs. It's better to not rely on the callers, but the callees to make sure that's the case. More concretely, I think we can introduce a new function called {{commitAllTasks}} such that under EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are some tasks that should not be committed because we know they have not processed any data, the {{commitAllTasks}} callee itself would do some clever filtering internally.

Given its scope, I think it's better to do this refactoring after EOS-v1 is removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)