You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/28 00:45:03 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

ableegoldman opened a new pull request #9515:
URL: https://github.com/apache/kafka/pull/9515


   Uninitialized tasks just return an empty collection in `changelogOffsets()` and are indistinguishable from genuinely stateless (or un-logged) tasks. We should just skip over these tasks and read directly from the checkpoint file when computing offset sums for a JoinGroup subscription


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9515: KAFKA-10651: read offsets directly from checkpoint for uninitialized tasks

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#discussion_r513664379



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -671,7 +672,7 @@ void handleLostAll() {
         // just have an empty changelogOffsets map.
         for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) {
             final Task task = tasks.get(id);
-            if (task != null) {
+            if (task != null && task.state() != State.CREATED) {

Review comment:
       Ahh, good catch: the task returns changelog offsets ultimately from the ProcessorStateManager's `stores` map, which is cleared during close. So we should fall through to the checkpoint file for `CLOSED` tasks as well (technically there's currently no way for a CLOSED task to be in the TaskManager's `tasks` map when the member sends a JoinGroup, since we remove it from the `tasks` immediately upon `close`, but that may not always be the case so better safe than sorry.
   
   For `SUSPENDED` we should still rely on the task's. changelog offsets since it doesn't do the checkpoint or clearing out of `stores` during suspend. (Actually very little happens in `suspend` at the moment)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#issuecomment-717691656


   > Although, as I'm typing this, I'm realizing the bug is probably that when the task is initialized, we'd report some high offsetSum, which the assignor interprets as a low lag, and when it's not initialized, then we report nothing, which the assignor interprets as a high lag (since the assignor will independently verify if it's a stateless task or not).
   
   Exactly. It shouldn't report different offset sums in two adjacent rebalances if nothing changed except that it was assigned a task. One way to look at it is if we had just waited slightly longer to rejoin the group, then the task would have been initialized with the checkpoint offsets anyway, so the checkpoint file is the source of truth while the task is still in CREATED.
   
   If the checkpoint file is empty (and the task uninitialized), then we _should_ report no offsets for that task because it doesn't have any actual state. Skipping the offset sum is technically how we handle stateless tasks, but it's also what happens for any stateful task we just don't happen to find on disk. Which is exactly what the task would be if the checkpoint is empty


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #9515: KAFKA-10651: read offsets directly from checkpoint for uninitialized tasks

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#issuecomment-719848567


   Cherry-picked to 2.7 (cc/ @bbejeck) and 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#discussion_r513290935



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -671,7 +672,7 @@ void handleLostAll() {
         // just have an empty changelogOffsets map.
         for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) {
             final Task task = tasks.get(id);
-            if (task != null) {
+            if (task != null && task.state() != State.CREATED) {

Review comment:
       Are there also other task states, we need to consider here except `CREATED`? What about `CLOSED` or `SUSPENDED`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#discussion_r513114972



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -2479,7 +2502,7 @@ public void shouldTransmitProducerMetrics() {
             allTasks.put(task.id(), (StateMachineTask) task);
         }
         for (final Task task : restoringTasks) {
-            assertThat(task.state(), not(Task.State.RUNNING));

Review comment:
       Nothing really changed here, but we use to just leave tasks in CREATED and call them "restoring" so I had to fix this up so they really were RESTORING




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#issuecomment-717683830


   Thanks for this quick fix, @ableegoldman !
   
   It looks like the bug before was that we would skip to encode these uninitialized tasks since they'd look "apparently stateless", and now we'll just try to read the checkpoint instead. If the checkpoint file is empty, though, it seems like the outcome is the same, though, right? We would not encode anything, just like a stateless task?
   
   Although, as I'm typing this, I'm realizing the bug is probably that when the task _is_ initialized, we'd report some high offsetSum, which the assignor interprets as a low lag, and when it's _not_ initialized, then we report nothing, which the assignor interprets as a high lag (since the assignor will independently verify if it's a stateless task or not). In that case, when we legitimately have no checkpoint file, then it's ok to report nothing, because we legitimately have a high lag. And it won't flip-flop in any case, because once the task gets initialized, its lag will still be about the same.
   
   Did I get that right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org