You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/28 18:16:58 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9515: KAFKA-10651: read offsets directly from checkpoint for uninitialized tasks

ableegoldman commented on a change in pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#discussion_r513664379



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -671,7 +672,7 @@ void handleLostAll() {
         // just have an empty changelogOffsets map.
         for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) {
             final Task task = tasks.get(id);
-            if (task != null) {
+            if (task != null && task.state() != State.CREATED) {

Review comment:
       Ahh, good catch: the task returns changelog offsets ultimately from the ProcessorStateManager's `stores` map, which is cleared during close. So we should fall through to the checkpoint file for `CLOSED` tasks as well (technically there's currently no way for a CLOSED task to be in the TaskManager's `tasks` map when the member sends a JoinGroup, since we remove it from the `tasks` immediately upon `close`, but that may not always be the case so better safe than sorry.
   
   For `SUSPENDED` we should still rely on the task's. changelog offsets since it doesn't do the checkpoint or clearing out of `stores` during suspend. (Actually very little happens in `suspend` at the moment)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org