You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/06 00:31:42 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

mjsax commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466056481



##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -379,9 +379,18 @@ private static void verifyReceivedAllRecords(final Map<TopicPartition, List<Cons
         final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
         for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
             final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
-            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecords.get(inputTopicPartition).iterator();
+            final List<ConsumerRecord<byte[], byte[]>> receivedRecordsForPartition = partitionRecords.getValue();
+            final List<ConsumerRecord<byte[], byte[]>> expectedRecordsForPartition = expectedRecords.get(inputTopicPartition);
+
+            System.out.println(partitionRecords.getKey() + " with " + receivedRecordsForPartition.size() + ", " +
+                    inputTopicPartition + " with " + expectedRecordsForPartition.size());
+
+            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecordsForPartition.iterator();
+            RuntimeException exception = null;
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : receivedRecordsForPartition) {
+                if (!expectedRecord.hasNext())

Review comment:
       Nit: add `{ }`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -461,6 +463,42 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {
+                        store.flush();
+                    } else if (store instanceof CachedStateStore) {
+                        ((CachedStateStore) store).flushCache();
+                    }
+                    log.trace("Flushed cache or buffer {}", store.name());
+                } catch (final RuntimeException exception) {
+                    if (firstException == null) {
+                        // do NOT wrap the error if it is actually caused by Streams itself
+                        if (exception instanceof StreamsException) {
+                            firstException = exception;
+                        } else {
+                            firstException = new ProcessorStateException(
+                                    format("%sFailed to flush cache of store %s", logPrefix, store.name()), exception);

Review comment:
       nit: formatting:
   ```
   firstException = new ProcessorStateException(
       format("%sFailed to flush cache of store %s", logPrefix, store.name()),
       exception
   );
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -49,6 +61,30 @@
         this.stateDirectory = stateDirectory;
     }
 
+    protected void initializeCheckpoint() {
+        // we will delete the local checkpoint file after registering the state stores and loading them into the

Review comment:
       Why do we actually delete the checkpoint file after registering? For non-eos, it seems we only need to delete the checkopint file if we wipe out the whole store?
   
   Or is it a "simplification" do not distinguish between eos/non-eos for this case?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -267,80 +266,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 // check for tasks that were owned previously but have changed active/standby status
                 tasksToRecycle.add(task);
             } else {
-                tasksToClose.add(task);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during #handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
-                }
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format(
-                    "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
-                    task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of clean).
-                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
-                dirtyTasks.add(task);
-            }
-        }
-
-        for (final Task oldTask : tasksToRecycle) {
-            final Task newTask;
-            try {
-                if (oldTask.isActive()) {
-                    if (!oldTask.state().equals(State.SUSPENDED)) {
-                        // Active tasks are revoked and suspended/committed during #handleRevocation
-                        log.error("Active task {} should be suspended prior to attempting to close but was in {}",
-                                  oldTask.id(), oldTask.state());
-                        throw new IllegalStateException("Active task " + oldTask.id() + " should have been suspended");
-                    }
-                    final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id());
-                    newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions);
-                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
-                } else {
-                    oldTask.suspend();
-                    oldTask.prepareCommit();
-                    oldTask.postCommit();
-                    final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id());
-                    newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer);
-                }
-                tasks.remove(oldTask.id());
-                addNewTask(newTask);
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(oldTask.id(), e);
-                dirtyTasks.add(oldTask);
+                tasksToCloseClean.add(task);
             }
         }
 
-        for (final Task task : dirtyTasks) {
-            closeTaskDirty(task);
-            cleanUpTaskProducer(task, taskCloseExceptions);
-            tasks.remove(task.id());
-        }
+        // close and recycle those tasks
+        handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, taskCloseExceptions);
 
         if (!taskCloseExceptions.isEmpty()) {
+            log.error("Hit exceptions while closing / recycling tasks: {}", taskCloseExceptions);

Review comment:
       Seems like double logging? We have a `log.error` each time before `taskCloseExceptions.put()` is called in `handleCloseAndRecycle`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of clean).
+                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {

Review comment:
       This seems to be a "hack" -- IMHO, as task should be only in either one set/list, but never in both... Can we change the first loop to use an explicit iterator and remove a task from `tasksToCloseClean` when we add it to `tasksToCloseDirty`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -344,40 +343,41 @@ public void resume() {
     }
 
     /**
+     * @throws StreamsException fatal error that should cause the thread to die
+     * @throws TaskMigratedException recoverable error that would cause the task to be removed

Review comment:
       It seems like almost every method might throw `StreamsException` and/or `TaskMigratedExcetpion` -- is it really worth to have those comments all over the place?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of clean).
+                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                tasksToCloseDirty.add(task);
+            }
+        }
+
+        for (final Task task : tasksToRecycle) {
+            final Task newTask;
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    if (task.isActive()) {
+                        final Set<TopicPartition> partitions = standbyTasksToCreate.remove(task.id());
+                        newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) task, partitions);
+                        cleanUpTaskProducer(task, taskCloseExceptions);
+                    } else {
+                        final Set<TopicPartition> partitions = activeTasksToCreate.remove(task.id());
+                        newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) task, partitions, mainConsumer);
+                    }
+                    tasks.remove(task.id());
+                    addNewTask(newTask);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);

Review comment:
       should it be `putIfAbsent` (cf `cleanUpTaskProducer()`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of clean).
+                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                tasksToCloseDirty.add(task);

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -379,9 +379,18 @@ private static void verifyReceivedAllRecords(final Map<TopicPartition, List<Cons
         final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
         for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
             final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
-            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecords.get(inputTopicPartition).iterator();
+            final List<ConsumerRecord<byte[], byte[]>> receivedRecordsForPartition = partitionRecords.getValue();
+            final List<ConsumerRecord<byte[], byte[]>> expectedRecordsForPartition = expectedRecords.get(inputTopicPartition);
+
+            System.out.println(partitionRecords.getKey() + " with " + receivedRecordsForPartition.size() + ", " +
+                    inputTopicPartition + " with " + expectedRecordsForPartition.size());
+
+            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecordsForPartition.iterator();
+            RuntimeException exception = null;
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : receivedRecordsForPartition) {
+                if (!expectedRecord.hasNext())
+                    exception = new RuntimeException("Result verification failed for " + receivedRecord + " since there's no more expected record");

Review comment:
       Should be `break` for this case?

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -390,9 +399,12 @@ private static void verifyReceivedAllRecords(final Map<TopicPartition, List<Cons
                 final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
 
                 if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
-                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
+                    exception = new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
             }
+
+            if (exception != null)

Review comment:
       Nit: add `{ }`

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -390,9 +399,12 @@ private static void verifyReceivedAllRecords(final Map<TopicPartition, List<Cons
                 final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
 
                 if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
-                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
+                    exception = new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");

Review comment:
       Should be `break` here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register state stores;
+        // if it is null it means the registration is not done and hence we should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())

Review comment:
       nit: add `{ }`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register state stores;
+        // if it is null it means the registration is not done and hence we should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not then we should always checkpoint;

Review comment:
       This line just says what the code say itself. The question seem to be, "why" and this question is not answered in the comment.
   
   From my understanding, because we delete the checkpoint file when the task is initialized, we want to write the checkpoint file again as soon as possible (ie, on first commit) instead of waiting until we hit the `OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT` to keep the time frame in which we have consistent state but not checkpoint file low?
   
   Thus, I am wondering if we should just not delete the checkpoint file when we init the task for non-eos instead? And just remove this additional condition?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register state stores;
+        // if it is null it means the registration is not done and hence we should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)

Review comment:
       nit: add `{ }`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -248,12 +248,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         );
 
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
-
         final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
         final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
-        final List<Task> tasksToClose = new LinkedList<>();
-        final Set<Task> tasksToRecycle = new HashSet<>();
-        final Set<Task> dirtyTasks = new HashSet<>();
+        final List<Task> tasksToRecycle = new LinkedList<>();
+        final List<Task> tasksToCloseClean = new LinkedList<>();
+        final List<Task> tasksToCloseDirty = new LinkedList<>();

Review comment:
       Why do we maintain `List` instead of `Set` ? It seem more natural to me to have a `Set` (ie, should we instead switch `tasksToRecyle` to use a `Set`, too)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of clean).
+                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                tasksToCloseDirty.add(task);
+            }
+        }
+
+        for (final Task task : tasksToRecycle) {
+            final Task newTask;
+            try {
+                if (!tasksToCloseDirty.contains(task)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -344,40 +343,41 @@ public void resume() {
     }
 
     /**
+     * @throws StreamsException fatal error that should cause the thread to die
+     * @throws TaskMigratedException recoverable error that would cause the task to be removed
      * @return offsets that should be committed for this task
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
-        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;

Review comment:
       Why do we remove this? It might be personal preference, but the old code was "cleaner" IMHO? (Feel free to ignore  this comment.)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -428,53 +428,28 @@ public void resume() {
         return committableOffsets;
     }
 
-    /**
-     * This should only be called if the attempted commit succeeded for this task
-     */
     @Override
-    public void postCommit() {
-        commitRequested = false;
-
+    public void postCommit(final boolean enforceCheckpoint) {
         switch (state()) {
             case CREATED:
                 // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint
                 // with empty uninitialized offsets
-                log.debug("Skipped writing checkpoint for created task");
+                log.debug("Skipped writing checkpoint for {} task", state());
 
                 break;
 
             case RESTORING:
-                writeCheckpoint();
-                log.debug("Finalized commit for restoring task");
+            case SUSPENDED:

Review comment:
       Nit: Can we keep `SUSPENDED` after `RUNNING` case? We use the same order in all methods and always follow the "natural" state transition order, that is CREATE, RESTORING, RUNNING, SUSPENDED, CLOSED.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register state stores;
+        // if it is null it means the registration is not done and hence we should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       I would expect that writing the checkpoint file should be cheap anyway (it's just some metadata, how bad can it be?) -- or are you worried about the blocking flush to the file system? Thus, I don't consider double-checkpointing as a real issue? Might be a case of pre-mature optimization? It might simplify our code if we just blindly write the checkpoint data even if it did not change? -- But I am fine either way.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -428,53 +428,28 @@ public void resume() {
         return committableOffsets;
     }
 
-    /**
-     * This should only be called if the attempted commit succeeded for this task
-     */
     @Override
-    public void postCommit() {
-        commitRequested = false;
-
+    public void postCommit(final boolean enforceCheckpoint) {
         switch (state()) {
             case CREATED:
                 // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint
                 // with empty uninitialized offsets
-                log.debug("Skipped writing checkpoint for created task");
+                log.debug("Skipped writing checkpoint for {} task", state());

Review comment:
       nit: Why this? We know that we are in state `created` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);

Review comment:
       We should remove `task` from `taskToCloseClean`/`taskToRecycle` here to maintain the invariant that a task is only in either one of the three sets/list but never in multiple at the same time. (cf. my comment below)
   
   I understand that you want to share this loop for "cleanClose" and "recycle" we can extract it into its own method to achieve this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);

Review comment:
       should it be `putIfAbsent` (cf `cleanUpTaskProducer()`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of clean).
+                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);

Review comment:
       should it be `putIfAbsent` (cf `cleanUpTaskProducer()`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -479,24 +512,20 @@ boolean tryToCompleteRestoration() {
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);
 
-        final Set<Task> revokedTasks = new HashSet<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-
+        final Set<Task> revokedActiveTasks = new HashSet<>();
+        final Set<Task> nonRevokedActiveTasks = new HashSet<>();
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+
         for (final Task task : activeTaskIterable()) {
             if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
-                try {
-                    task.suspend();
-                    revokedTasks.add(task);
-                } catch (final RuntimeException e) {
-                    log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
-                    firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e));
-                }
+                // when the task input partitions are included in the revoked list,
+                // this is an active task and should be revoked
+                revokedActiveTasks.add(task);
+                remainingRevokedPartitions.removeAll(task.inputPartitions());
             } else if (task.commitNeeded()) {
-                additionalTasksForCommitting.add(task);
+                nonRevokedActiveTasks.add(task);

Review comment:
       Why this renaming? -- the set does not contain "all" non-revoked active tasks -- only those, that need committing?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -243,18 +242,24 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         for (final Task task : tasksToClose) {
             try {
-                if (task.isActive()) {

Review comment:
       What other PR?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
+                //    and their changelog positions should not change at all postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit offsets but only need to
+                    // flush / checkpoint state stores, so we only need to call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of clean).
+                // Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -454,6 +456,41 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
       I agree. IMHO, we should get rid of KTable/store cache all together and only have a "changelog-writer-cache" that regular stores and suppress() can use to reduce the write load on the changelog topic. For downstream rate control, users should use suppress() (and we might want to try to unify suppress() and the upstream store somehow eventually to avoid the current redundancy)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
+        }

Review comment:
       While I usually prefer checks like this, it seems unnecessary here? (Feel free to ignore.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org