You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/12 00:36:21 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

ableegoldman opened a new pull request #8856:
URL: https://github.com/apache/kafka/pull/8856


   1. KAFKA-10150: 
       - always transition to SUSPENDED during `suspend`, no matter the current state
       - only call `prepareCommit` before closing if `task.commitNeeded` is true
   2. Don't commit any consumed offsets during `handleAssignment` -- revoked active tasks (and any others that need committing) will be committed during `handleRevocation` ao we only need to worry about cleaning them up in `handleAssignment`
   3. KAFKA-10152: when recycling a task we should always commit consumed offsets (if any), but don't need to write the checkpoint (since changelog offsets are preserved across task transitions) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643401987


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-644897444


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-645062759


   Cherry-picked to 2.6.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441026485



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -502,56 +494,24 @@ public void closeAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void maybeScheduleCheckpoint() {
-        switch (state()) {
-            case RESTORING:
-            case SUSPENDED:
-                this.checkpoint = checkpointableOffsets();
-
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) {
-                    this.checkpoint = checkpointableOffsets();
-                }
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
-        }
-    }
-
-    private void writeCheckpointIfNeed() {
+    private void maybeWriteCheckpoint() {
         if (commitNeeded) {
+            log.error("Tried to write a checkpoint with pending uncommitted data, should complete the commit first.");
             throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
         }
-        if (checkpoint != null) {
-            stateMgr.checkpoint(checkpoint);
-            checkpoint = null;
-        }
+        stateMgr.checkpoint(checkpointableOffsets());
     }
 
     /**
-     * <pre>
-     * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
-     *  2. then if we are closing on EOS and dirty, wipe out the state store directory
-     *  3. finally release the state manager lock
-     * </pre>
+     * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
      */
     private void close(final boolean clean) {
-        if (clean) {
-            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
+        if (clean && commitNeeded) {
+            log.debug("Tried to close clean but there was an active scheduled checkpoint, this means we failed to"

Review comment:
       nit: `was an active scheduled checkpoint` -> `there was a pending uncommitted data`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Sounds good, in that case the nested try-catch would be necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441047360



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,26 +468,24 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
         switch (state()) {
-            case CREATED:
             case SUSPENDED:
                 stateMgr.recycle();
                 recordCollector.close();
 
                 break;
 
-            case RESTORING: // we should have transitioned to `SUSPENDED` already
-            case RUNNING: // we should have transitioned to `SUSPENDED` already
+            case CREATED:
+            case RESTORING:
+            case RUNNING:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
             default:
                 throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
         }
 
+        // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
+        // because otherwise we loose the partition-time information
         partitionGroup.clear();

Review comment:
       As we always suspend a task before closing (even for unclean closing), I think we can actually remove this call?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441114757



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       Ah that seems right. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441047360



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,26 +468,24 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
         switch (state()) {
-            case CREATED:
             case SUSPENDED:
                 stateMgr.recycle();
                 recordCollector.close();
 
                 break;
 
-            case RESTORING: // we should have transitioned to `SUSPENDED` already
-            case RUNNING: // we should have transitioned to `SUSPENDED` already
+            case CREATED:
+            case RESTORING:
+            case RUNNING:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
             default:
                 throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
         }
 
+        // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
+        // because otherwise we loose the partition-time information
         partitionGroup.clear();

Review comment:
       As we always suspend a task before closing (even for unclean closing), I think we can actually remove this call? (We only needed it before, because SUSPEND could be skipped.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439553465



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);

Review comment:
       You're right :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440554249



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       I meant the later. And I agree that if `commit` fails, we should not call `postCommit()`.
   
   For failure in `postCommit`: we make assumptions about the current code what seems dangerous (ie, not future prove)? -- IMHO, if `postCommit` fails, we need to close the corresponding task dirty and either recreate it, or rebalance, but we should also continue to call `postCommit()` for all other tasks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643492619


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439520870



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);

Review comment:
       Actually we don't checkpoint during `closeClean` anymore. You always have to commit (if `clean` && `commitNeeded`) before closing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439585848



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -112,10 +112,19 @@ public void completeRestoration() {
     @Override
     public void suspend() {
         log.trace("No-op suspend with state {}", state());
-        if (state() == State.RUNNING) {
-            transitionTo(State.SUSPENDED);
-        } else if (state() == State.RESTORING) {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending standby task " + id);
+        switch (state()) {
+            case CREATED:
+            case RUNNING:
+            case SUSPENDED:
+                transitionTo(State.SUSPENDED);
+                break;
+
+            case RESTORING:

Review comment:
       I agree 100%, but at some point in the past we started checking for `RESTORING` and throwing IllegalStateException all over StandbyTask. I wanted to keep the changes here to a minimum and figured we should at least be consistent with the current pattern elsewhere




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643007770


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-644498795


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643456810






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439586118



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3
+        CLOSED(0);                // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks

Review comment:
       I think you're looking at an old version of the PR :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440473726



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() {
     /**
      * <pre>
      * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
+     *  1. commit/checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed

Review comment:
       ack

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       1. I think you're right, we don't need to keep track of the current `checkpoint` offsets at all and can just write the current `checkpointableOffsets` in `postCommit`
   2. done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643492691


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441059100



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +717,26 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

Review comment:
       Thinking about punctuation, should we actually call `commitOffsetsOrTransaction()` unconditionally (ie, not consider if `consumedOffsetsAndMetadataPerTask` is empty or not?
   
   We can still move the check inside `consumedOffsetsAndMetadataPerTask`, but for EOS there might pending writes from punctuation that we still need to commit?
   
   This would apply to all calls of `commitOffsetsOrTransaction` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643402127


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555458



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       `postCommit` will always write the checkpoint if the task is in SUSPENDED, which it should always be before being closed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439532197



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -112,10 +112,19 @@ public void completeRestoration() {
     @Override
     public void suspend() {
         log.trace("No-op suspend with state {}", state());
-        if (state() == State.RUNNING) {
-            transitionTo(State.SUSPENDED);
-        } else if (state() == State.RESTORING) {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending standby task " + id);
+        switch (state()) {
+            case CREATED:
+            case RUNNING:
+            case SUSPENDED:
+                transitionTo(State.SUSPENDED);
+                break;
+
+            case RESTORING:

Review comment:
       I get that standbys should never really be in RESTORING state, but it still doesn't seem like it's philosophically any more illegal to suspend from RESTORING than it is from RUNNING. I'd vote to legalize RESTORING here. It does seem like a useful sanity check for CLOSED to be illegal, though.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3
+        CLOSED(0);                // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks

Review comment:
       It looks like your changes in the tasks have prohibited any state from transitioning to CLOSED except SUSPENDED. Should we update the state machine to reflect this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -219,13 +214,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 tasksToRecycle.add(task);
             } else {
                 try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    task.suspend(); // Should be a no-op for all active tasks, unless we hit an exception during handleRevocation
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    taskIter.remove();

Review comment:
       I think we previously followed the "loop over iterator and remove during iteration" pattern, and we got away from it because it was too confusing. Do we really need to re-introduce it now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643456981


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439549676



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -528,7 +521,8 @@ private void maybeScheduleCheckpoint() {
 
     private void writeCheckpointIfNeed() {
         if (commitNeeded) {
-            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
+            throw new IllegalStateException("A checkpoint should only be written if the previous commit has completed"
+                                                + " and there is no new commit needed.");

Review comment:
       > and there is no new commit needed -> this seem to be miss leading because the commitNeeded flag is not really a guard for this case.
   
   Isn't that exactly what the `commitNeeded` flag is?
   
   That said, looking at this again, I agree this new phrasing doesn't really make sense. But the original comment took me a while to understand also (shouldn't we _only_ write a checkpoint if there's a commit needed?) IIUC the point is that we should not write a checkpoint before doing the actual commit, ie there should not be pending uncommitted data when we write the checkpoint. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643542951


   The sad thing is it's doomed to fail (`connect:runtime:compileTestJava` is broken)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441088847



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       Ah that's a good catch. Makes sense to me.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +717,26 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

Review comment:
       Committable offsets here should contain consumed offsets, and punctuation itself should never update those consumed offsets right?
   
   I think we can skip the call if `consumedOffsetsAndMetadataPerTask` is empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441101688



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +717,26 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

Review comment:
       > Committable offsets here should contain consumed offsets, and punctuation itself should never update those consumed offsets right
   
   Yes.
   
   > I think we can skip the call if consumedOffsetsAndMetadataPerTask is empty.
   
   For non-eos, yes, because not non eos `commitOffsetsOrTransaction()` would only commit offsets. However, for eos (alpha and beta), we might have a pending transaction that we need to commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439555399



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -239,54 +240,15 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                }
-
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : additionalTasksForCommitting) {
-                    task.postCommit();
-                }
-            } catch (final RuntimeException e) {
-                log.error("Failed to batch commit tasks, " +
-                    "will close all tasks involved in this commit as dirty by the end", e);
-                dirtyTasks.addAll(additionalTasksForCommitting);
-                dirtyTasks.addAll(tasksToClose);
-
-                tasksToClose.clear();
-                // Just add first taskId to re-throw by the end.
-                taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of clean).
-                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                dirtyTasks.add(task);
-            }
-        }
-
         for (final Task oldTask : tasksToRecycle) {
             final Task newTask;
             try {
                 if (oldTask.isActive()) {
+                    // If active, the task should have already been suspended and committed during handleRevocation

Review comment:
       Actually I forgot to update this. We should always call `suspend` since some active tasks may not have been suspended during `handleRevocation`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       `postCommit` only write a checkpoint for non-eos. Thus, we still need to write a checkpoint for eos case in `close()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-644897139


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441065091



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       The consumer tracks offset internal, however, we buffer data in our internal queue. Thus, the offset tracked by the consumer, might be larger than the offset we commit (we take the offset we commit not from the consumer, but it's based on the records we did take out of the queue and processed).
   
   In eager rebalancing, the consumer clears its internal state if a partition in revoked (and we only suspend the task), including the tracked offsets. If the partition in re-assigned, the consumer fetches the last committed offset to start fetching. Thus, if we don't clear the queue, we might fetch same data that is already in the queue a second time.
   
   Does this make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441081177



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +717,26 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

Review comment:
       Hm. So in the punctuation case -- where `commitNeeded` is true but `consumedOffsets` is empty -- we still need to call `commitOffsetsOrTransaction` (and `postCommit`) because the punctuation may for example write to a state store and generate changelog records. So we would need to commit that transaction, and also write the checkpoint file.
   Makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643532226


   Mr.J. does not like me recently... Will retry later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-644988817


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441059865



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -465,44 +429,82 @@ boolean tryToCompleteRestoration() {
     }
 
     /**
+     * Handle the revoked partitions and prepare for closing the associated tasks in {@link #handleAssignment(Map, Map)}
+     * We should commit the revoked tasks now as we will not officially own them anymore when {@link #handleAssignment(Map, Map)}
+     * is called. Note that only active task partitions are passed in from the rebalance listener, so we only need to
+     * consider/commit active tasks here
+     *
+     * If eos-beta is used, we must commit ALL tasks. Otherwise, we can just commit those (active) tasks which are revoked
+     *
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
-        final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);
+        final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);
 
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        for (final Task task : tasks.values()) {
-            if (remainingPartitions.containsAll(task.inputPartitions())) {
-                task.suspend();
-                final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+        final Set<Task> tasksToCommit = new HashSet<>();
+        final Set<Task> additionalTasksForCommitting = new HashSet<>();
 
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+        for (final Task task : activeTaskIterable()) {
+            if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
+                try {
+                    task.suspend();
+                    if (task.commitNeeded()) {
+                        tasksToCommit.add(task);
+                    }
+                } catch (final RuntimeException e) {
+                    log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
+                    firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e));
                 }
-            } else if (task.isActive() && task.commitNeeded()) {
-                final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+            } else if (task.commitNeeded()) {
+                additionalTasksForCommitting.add(task);
+            }
+            remainingRevokedPartitions.removeAll(task.inputPartitions());
+        }
 
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                }
+        if (!remainingRevokedPartitions.isEmpty()) {
+            log.warn("The following partitions {} are missing from the task partitions. It could potentially " +
+                         "due to race condition of consumer detecting the heartbeat failure, or the tasks " +
+                         "have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
+        }
+
+        final RuntimeException suspendException = firstException.get();
+        if (suspendException != null) {
+            throw suspendException;
+        }
+
+        // If using eos-beta, if we must commit any task then we must commit all of them
+        // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify by always committing everything
+        if (processingMode ==  EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) {
+            tasksToCommit.addAll(additionalTasksForCommitting);
+        }
+
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+        for (final Task task : tasksToCommit) {
+            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+            if (!committableOffsets.isEmpty()) {
+                consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+            } else {
+                log.warn("Task {} claimed to need a commit but had no committable consumed offsets", task.id());

Review comment:
       Oh good point. I'll remove this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439851616



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() {
     /**
      * <pre>
      * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
+     *  1. commit/checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed

Review comment:
       Seems we would never commit and checkpoint state manager any more.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Yeah I think if the actual `consumer.commit` call failed, then we should not trigger postCommit for any one.
   
   As for `postCommit`, I think it should never fail (we swallow the IO exception happened, because for non-EOS it is just fine, for EOS we would bootstrap from scratch).
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       I re-read the current code structure and got some questions:
   
   1) we collect checkpoint from `prepareCommit` and check if it is not null in `postCommit`, but the actual checkpoint value itself is always collectable post the commit, and hence what's only required to that we need to know if we need to write a checkpoint file or not. Previously this needs to be decided since we may transit the state in between but now from the source code it seems to me that we would only call `prepare/post` before suspend / close ever, so this is no longer required actually, i.e. we can decide whether we need to checkpoint and then collect the checkpoint map and write the file if needed in a single call. Is that right?
   
   2. I think I agree with you that it is cleaner to make sure in `handleRevocation`, we still transit those revoked partition's corresponding tasks to suspended even if some of their commit call failed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555918



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       I see. Then I think it makes sense to always attempt to write the checkpoint/call `postCommit` for a task that was successfully committed, regardless of whether something went wrong during `postCommit` with a different task 
   
   And I agree, we should not make assumptions about the current code not throwing, unless it's explicitly in the contract of the method that it will never throw (which is not the case for `postCommit`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439493219



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);

Review comment:
       When closing-clean a standby task, we would checkpoint the file and close the state store which would also flush it as well, so I think we do not need to call
   
   ```
   task.prepareCommit();
   task.postCommit();
   ```
   
   which is just to flush the stores and write checkpoint files, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -440,41 +402,35 @@ boolean tryToCompleteRestoration() {
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
-        final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);
+        final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);

Review comment:
       nit: Add in the above javadoc that we should only revoke active tasks here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       nit: add a comment above `task.suspend()` that for active it should always be an no-op?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439523641



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);

Review comment:
       Is there a different PR removing that? I still see
   
   ```
   if (clean) {
                   // since there's no written offsets we can checkpoint with empty map,
                   // and the state current offset would be used to checkpoint
                   stateMgr.checkpoint(Collections.emptyMap());
                   offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
               }
   ```
   
   in Standby.close() in trunk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439586398



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -219,13 +214,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 tasksToRecycle.add(task);
             } else {
                 try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    task.suspend(); // Should be a no-op for all active tasks, unless we hit an exception during handleRevocation
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    taskIter.remove();

Review comment:
       Oh, I thought we removed/refactored it for other reasons. I'm happy to undo this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643532115






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       `postCommit` only writes a checkpoint for non-eos. Thus, we still need to write a checkpoint in `close()` for the eos-case (if just blindly for all cases as we do atm).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441049817



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       I see. So we should only clear it here, and not in `close` 
   
   Just curious, why do we "forget" the current offset? I mean, haven't we just committed the current offset before suspending (and if that failed we would close all tasks right away). Maybe I'm misunderstanding what you mean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441109010



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       Not 100% familiar with the consumer code, but in `SubscriptionState#assignFromSubscribed` new `TopicPartitionState` are created with `position = null`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439702536



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       Instead of checkpointing, we can check `if clean && commitNeeded & checkpoint != null` and then throw an exception on closeClean (which would result in calling `closeDirty`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439606939



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -952,58 +951,11 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
     }
 
     @Test
-    public void shouldCleanupAnyTasksClosedAsDirtyAfterCommitException() {

Review comment:
       This was a test I added a little while back in response to a bugfix, but it no longer makes sense in the current context (in fact it's currently not really testing anything at all, since the original point was to make sure the changelog reader partitions were cleaned up but that's not even the responsibility of the TaskManager anymore)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439695107



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429
+     *                +------+--------+           |
+     *                       |                    |
+     *                       v                    |
+     *                 +-----+-------+            |
+     *                 | Closed (4)  | -----------+
      *                 +-------------+
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3),            // 0
+        RESTORING(2, 3),          // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3

Review comment:
       So far, we did not allow idempotent state transitions in the state machine itself, but handle it caller side. -- It seem inconsistent to allow `SUSPENDED -> SUSPEND` but not `CREATE -> CREATED` etc.
   
   I would recommend to keep the current pattern and avoid calling `transiteState()` if the task is already in the target state. -- I would also be happy to change it, but for this case, we should change it for _all_ cases. However, this would enlarge the scope of this PR and I think it better _not_ to do it in this PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -250,14 +250,10 @@ public void completeRestoration() {
     public void suspend() {
         switch (state()) {
             case CREATED:
-            case SUSPENDED:

Review comment:
       IMHO, we should keep the `SUSPENDED` case for consistency reasons. Only merge `CREATED` and `RESTORING` (cf. my other comment on `Task.java`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -112,10 +112,19 @@ public void completeRestoration() {
     @Override
     public void suspend() {
         log.trace("No-op suspend with state {}", state());
-        if (state() == State.RUNNING) {
-            transitionTo(State.SUSPENDED);
-        } else if (state() == State.RESTORING) {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending standby task " + id);
+        switch (state()) {
+            case CREATED:
+            case RUNNING:
+            case SUSPENDED:

Review comment:
       The `SUSPEND` case should be no-op IMHO and not call `transiteTo()` (compare my other comment in `Task.java`).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,20 +470,17 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
+        // Stream tasks should have already been suspended and their consumed offsets committed before recycling

Review comment:
       Is the comment necessary? Seem the code is self-explaining?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();

Review comment:
       Nit: Why `LinkedList<Task> tasksToClose`? Should we only declare it as `List`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429

Review comment:
       Why remove the `<` arrow? We can still transit from `RESTORING` to `SUSPENDED`.
   
   Super-nit: `+---->| Suspended` -> `+---> | Suspended`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Why do we do the `try-catch` as outer-layer? In an exception occurs, we should stop looping through the tasks to call `postCommit()` -- is this intended? If yes, why?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       If we hit an exception in `handleRevocation` why would we continue here? Are we still in a "clean enough" state to actually continue?
   
   Below we call `completeTaskCloseClean(task)` what seem incorrect for this case as it might close clean task even if we did not successfully commit before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439522978



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-644988683


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439538197



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -528,7 +521,8 @@ private void maybeScheduleCheckpoint() {
 
     private void writeCheckpointIfNeed() {
         if (commitNeeded) {
-            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
+            throw new IllegalStateException("A checkpoint should only be written if the previous commit has completed"
+                                                + " and there is no new commit needed.");

Review comment:
       `and there is no new commit needed` -> this seem to be miss leading because the `commitNeeded` flag is not really a guard for this case. -- Also, `if the previous commit has complete` is something we don't really know here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3
+        CLOSED(0);                // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks

Review comment:
       Can we update the comment with the state transitions above, too?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1

Review comment:
       Seems we need to transit from RESTORING to SUSPENDED now, before closing, and never directly from RESTORING to CLOSED?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       Not sure if I can follow -- if it's a no-op, why do we call it? Or do you say, we need to tall if for standbies as we don't suspend them presiously?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -239,54 +240,15 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                }
-
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : additionalTasksForCommitting) {
-                    task.postCommit();
-                }
-            } catch (final RuntimeException e) {
-                log.error("Failed to batch commit tasks, " +
-                    "will close all tasks involved in this commit as dirty by the end", e);
-                dirtyTasks.addAll(additionalTasksForCommitting);
-                dirtyTasks.addAll(tasksToClose);
-
-                tasksToClose.clear();
-                // Just add first taskId to re-throw by the end.
-                taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of clean).
-                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                dirtyTasks.add(task);
-            }
-        }
-
         for (final Task oldTask : tasksToRecycle) {
             final Task newTask;
             try {
                 if (oldTask.isActive()) {
+                    // If active, the task should have already been suspended and committed during handleRevocation

Review comment:
       Above, we call `suspend()` blindly and have a comment that for active it's a no-op. -- Might be good to align both cases to use the same pattern (I don't care which one we pick)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439551380



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1

Review comment:
       Ack good catch. Same with CREATED




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #8856:
URL: https://github.com/apache/kafka/pull/8856


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643532285


   Haha. After complaining Mr.J. changes his mind :D


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439522753



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -440,41 +402,35 @@ boolean tryToCompleteRestoration() {
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
-        final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);
+        final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);

Review comment:
       ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439699960



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();

Review comment:
       ack

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429
+     *                +------+--------+           |
+     *                       |                    |
+     *                       v                    |
+     *                 +-----+-------+            |
+     *                 | Closed (4)  | -----------+
      *                 +-------------+
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3),            // 0
+        RESTORING(2, 3),          // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3

Review comment:
       I see. I was just thinking we should make the idempotency explicit for each state by allowing/disallowing the transition, but I agree we can do that in a followup PR

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,20 +470,17 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
+        // Stream tasks should have already been suspended and their consumed offsets committed before recycling

Review comment:
       Yeah it does seem unnecessary 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Well if `commit` throws an exception, then we shouldn't call `postCommit` right? 
   
   Or are you saying if  `commit` succeeds but `postCommit` throws for one task, we should still loop through and try to `postCommit` all the other tasks?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed active/standby status
             } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the point of clean).
-                    // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation

Review comment:
       If we hit an exception in `handleRevocation` on some task then we would skip out on suspending the rest of the tasks, ie the set of not-suspended tasks does not contain the task that threw (of course if one task threw an exception then its likely others will too, but not guaranteed).
   
   But maybe it's cleaner to catch exceptions during `handleRevocation` and at least make sure every task gets suspended? I'll try that
   
   On a related note, if we _always_  have to commit before closing (or at least attempt to), should we just remove the `writeCheckpointIfNeeded` call from `closeClean`? Seems like the `pre/postCommit` should be responsible for whether to checkpoint, not `close`. In this case, it's completely fine to _attempt_ a clean close of a dirty task, as the `closeClean` method will just maybe throw in which case we can close dirty. WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) could be removed after we've stable on KIP-429

Review comment:
       The diff makes it hard to tell, but I "merged" the path to SUSPENDED from CREATED and RESTORING. I find it a bit easier to follow when all the arrows are unidirectional




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439550755



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -528,7 +521,8 @@ private void maybeScheduleCheckpoint() {
 
     private void writeCheckpointIfNeed() {
         if (commitNeeded) {
-            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
+            throw new IllegalStateException("A checkpoint should only be written if the previous commit has completed"
+                                                + " and there is no new commit needed.");

Review comment:
       I'll revert it to the original and add a error log message: does
   ```log.error("Tried to write a checkpoint with pending uncommitted data, should complete the commit first.");```
   make sense to you?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441101688



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +717,26 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

Review comment:
       > Committable offsets here should contain consumed offsets, and punctuation itself should never update those consumed offsets right
   
   Yes.
   
   > I think we can skip the call if consumedOffsetsAndMetadataPerTask is empty.
   
   For non-eos, yes, because for non-eos `commitOffsetsOrTransaction()` would only commit offsets via the consumer (this can be skipped if empty). However, for eos (alpha and beta), we might have a pending transaction that we need to commit on the producer, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439529480



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);

Review comment:
       I suspect your trunk is out of date 🙂 
   
   (that code & comment is now in `postCommit`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439545899



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       We need to do it for all standbys, and we may need to do it for some actives. Since suspending is now idempotent we may as well just call `suspend` universally




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-645043278


   Failures were all due to known-flaky/completely broken `ReassignPartitionsUnitTest.testModifyBrokerThrottles` (PR for that test is still waiting to be merged -- https://issues.apache.org/jira/browse/KAFKA-10147)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441046024



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       I think we still need to make this call -- in eager rebalancing, we suspend a task when we get a partition revoked. For this case, we "forget" the current offset within the consumer and thus need to clear the partition grouper. Otherwise, we might read the data a second time, if the partition is reassigned (what would violate EOS).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441089397



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       Got it, thanks for the explanation. I'll move it back to `postCommit` with a note




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439689801



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       The current comment is
   ```
   // Should be a no-op for active tasks, unless we hit an exception during handleRevocation
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441092703



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       I'm not 100% certain that the Consumer _does_ clear its internal buffer on revocation. At least, I couldn't find it in the code, but maybe I'm looking in the wrong place. 
   
   Not arguing we shouldn't clear the partition group here, was just wondering about this for my own sake. Hm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441056149



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -465,44 +429,82 @@ boolean tryToCompleteRestoration() {
     }
 
     /**
+     * Handle the revoked partitions and prepare for closing the associated tasks in {@link #handleAssignment(Map, Map)}
+     * We should commit the revoked tasks now as we will not officially own them anymore when {@link #handleAssignment(Map, Map)}
+     * is called. Note that only active task partitions are passed in from the rebalance listener, so we only need to
+     * consider/commit active tasks here
+     *
+     * If eos-beta is used, we must commit ALL tasks. Otherwise, we can just commit those (active) tasks which are revoked
+     *
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
-        final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);
+        final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);
 
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        for (final Task task : tasks.values()) {
-            if (remainingPartitions.containsAll(task.inputPartitions())) {
-                task.suspend();
-                final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+        final Set<Task> tasksToCommit = new HashSet<>();
+        final Set<Task> additionalTasksForCommitting = new HashSet<>();
 
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+        for (final Task task : activeTaskIterable()) {
+            if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
+                try {
+                    task.suspend();
+                    if (task.commitNeeded()) {
+                        tasksToCommit.add(task);
+                    }
+                } catch (final RuntimeException e) {
+                    log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
+                    firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e));
                 }
-            } else if (task.isActive() && task.commitNeeded()) {
-                final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+            } else if (task.commitNeeded()) {
+                additionalTasksForCommitting.add(task);
+            }
+            remainingRevokedPartitions.removeAll(task.inputPartitions());
+        }
 
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                }
+        if (!remainingRevokedPartitions.isEmpty()) {
+            log.warn("The following partitions {} are missing from the task partitions. It could potentially " +
+                         "due to race condition of consumer detecting the heartbeat failure, or the tasks " +
+                         "have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
+        }
+
+        final RuntimeException suspendException = firstException.get();
+        if (suspendException != null) {
+            throw suspendException;
+        }
+
+        // If using eos-beta, if we must commit any task then we must commit all of them
+        // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify by always committing everything
+        if (processingMode ==  EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) {
+            tasksToCommit.addAll(additionalTasksForCommitting);
+        }
+
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+        for (final Task task : tasksToCommit) {
+            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+            if (!committableOffsets.isEmpty()) {
+                consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+            } else {
+                log.warn("Task {} claimed to need a commit but had no committable consumed offsets", task.id());

Review comment:
       Is this necessarily a warning? A wall-clock-time punctuation could have set `commitNeeded` to `true`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439552656



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3
+        CLOSED(0);                // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks

Review comment:
       :/ yeah...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643003389


   @guozhangwang @mjsax @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439524659



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       Actually it's not _always_ a no-op, since we will bail on suspending/committing the remaining active tasks if any of them throws an exception in `handleRevocation` -- so it's possible some active tasks are not yet suspended at this point.
   I will add a comment to clarify




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441059100



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +717,26 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

Review comment:
       Thinking about punctuation, should we actually call `commitOffsetsOrTransaction()` unconditionally (ie, not consider if `consumedOffsetsAndMetadataPerTask` is empty or not?
   
   We can still move the check inside `consumedOffsetsAndMetadataPerTask`, but for EOS there might pending writes from punctuation that we still need to commit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643022606


   Also failed with unrelated (extremely) flaky 
   `ReassignPartitionsUnitTest.testModifyBrokerThrottles`
   and
    `SslSelectorTest.testCloseOldestConnection` (tickets created)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441116141



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -690,18 +686,21 @@ void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
         final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
         for (final Task task : tasks.values()) {
             if (clean) {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {
+                        tasksToCommit.add(task);
+                        final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        if (task.isActive()) {

Review comment:
       As @mjsax pointed out, we should still commit even if there are no consumed offsets. However, we should not commit the offsets/transaction if there are no active tasks that need committing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439688238



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       For this case, it won't be a no-op for some active tasks? So we should not have such a comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441049200



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -502,56 +494,24 @@ public void closeAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void maybeScheduleCheckpoint() {
-        switch (state()) {
-            case RESTORING:
-            case SUSPENDED:
-                this.checkpoint = checkpointableOffsets();
-
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) {
-                    this.checkpoint = checkpointableOffsets();
-                }
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
-        }
-    }
-
-    private void writeCheckpointIfNeed() {
+    private void maybeWriteCheckpoint() {

Review comment:
       Seems we call `stateMgr.checkpoint` unconditionally now. Should we rename this this `writeCheckpoint` ? Or even remove it all together as we `if (commitNeeded)` check is "just" a guard and the method is a single liner now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441081446



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -502,56 +494,24 @@ public void closeAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void maybeScheduleCheckpoint() {
-        switch (state()) {
-            case RESTORING:
-            case SUSPENDED:
-                this.checkpoint = checkpointableOffsets();
-
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) {
-                    this.checkpoint = checkpointableOffsets();
-                }
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
-        }
-    }
-
-    private void writeCheckpointIfNeed() {
+    private void maybeWriteCheckpoint() {

Review comment:
       Well, the `commitNeeded` guard is a good idea imo. But I agree we should rename this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org