You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/11 04:04:14 UTC

[kafka] branch kafka-9441-refactor-tm created (now 1966d43)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a change to branch kafka-9441-refactor-tm
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 1966d43  MINOR: code cleanup for Kafka Streams task classes

This branch includes the following new commits:

     new 1966d43  MINOR: code cleanup for Kafka Streams task classes

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: MINOR: code cleanup for Kafka Streams task classes

Posted by mj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-9441-refactor-tm
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1966d4363e91f5045b014ce375ce0a58ee2a5a79
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jun 10 20:05:25 2020 -0700

    MINOR: code cleanup for Kafka Streams task classes
---
 .../streams/processor/internals/AbstractTask.java  |  63 +-
 .../processor/internals/ActiveTaskCreator.java     |  14 +-
 .../streams/processor/internals/StandbyTask.java   | 146 ++--
 .../processor/internals/StandbyTaskCreator.java    |  16 +-
 .../streams/processor/internals/StreamTask.java    | 968 +++++++++++----------
 .../kafka/streams/processor/internals/Task.java    | 111 +--
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/StandbyTaskTest.java       |  11 +-
 .../processor/internals/StreamTaskTest.java        |  94 +-
 .../processor/internals/TaskManagerTest.java       |   3 +-
 .../StreamThreadStateStoreProviderTest.java        |  15 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  14 +-
 12 files changed, 743 insertions(+), 714 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index f59571b..8c342a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
     private Task.State state = CREATED;
-    protected Set<TopicPartition> inputPartitions;
-    protected ProcessorTopology topology;
 
     protected final TaskId id;
+    protected final ProcessorTopology topology;
     protected final StateDirectory stateDirectory;
     protected final ProcessorStateManager stateMgr;
 
+    protected Set<TopicPartition> inputPartitions;
+
     AbstractTask(final TaskId id,
                  final ProcessorTopology topology,
                  final StateDirectory stateDirectory,
                  final ProcessorStateManager stateMgr,
                  final Set<TopicPartition> inputPartitions) {
         this.id = id;
-        this.stateMgr = stateMgr;
         this.topology = topology;
-        this.inputPartitions = inputPartitions;
+        this.stateMgr = stateMgr;
         this.stateDirectory = stateDirectory;
+        this.inputPartitions = inputPartitions;
     }
 
     @Override
-    public TaskId id() {
-        return id;
+    public void revive() {
+        if (state == CLOSED) {
+            transitionTo(CREATED);
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
+        }
     }
 
     @Override
-    public Set<TopicPartition> inputPartitions() {
-        return inputPartitions;
+    public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
+        stateMgr.markChangelogAsCorrupted(partitions);
     }
 
     @Override
-    public Collection<TopicPartition> changelogPartitions() {
-        return stateMgr.changelogPartitions();
+    public final TaskId id() {
+        return id;
     }
 
     @Override
-    public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
-        stateMgr.markChangelogAsCorrupted(partitions);
+    public final Task.State state() {
+        return state;
     }
 
     @Override
-    public StateStore getStore(final String name) {
-        return stateMgr.getStore(name);
+    public void updateInputPartitions(final Set<TopicPartition> topicPartitions,
+                                      final Map<String, List<String>> nodeToSourceTopics) {
+        this.inputPartitions = topicPartitions;
+        topology.updateSourceTopics(nodeToSourceTopics);
     }
 
     @Override
-    public boolean isClosed() {
-        return state() == State.CLOSED;
+    public Set<TopicPartition> inputPartitions() {
+        return inputPartitions;
     }
 
     @Override
-    public final Task.State state() {
-        return state;
+    public StateStore getStore(final String name) {
+        return stateMgr.getStore(name);
     }
 
     @Override
-    public void revive() {
-        if (state == CLOSED) {
-            transitionTo(CREATED);
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
-        }
+    public Collection<TopicPartition> changelogPartitions() {
+        return stateMgr.changelogPartitions();
     }
 
     final void transitionTo(final Task.State newState) {
@@ -118,10 +121,4 @@ public abstract class AbstractTask implements Task {
             }
         }
     }
-
-    @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
-        this.inputPartitions = topicPartitions;
-        topology.updateSourceTopics(nodeToSourceTopics);
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 0a1f47e..4d2dc5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -230,17 +230,17 @@ class ActiveTaskCreator {
 
         final StreamTask task = new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context
+            consumer,
+            recordCollector
         );
 
         log.trace("Created task {} with assigned partitions {}", taskId, partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 9c80fd7..90f0aeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -39,11 +39,13 @@ import java.util.Set;
  * A StandbyTask
  */
 public class StandbyTask extends AbstractTask implements Task {
-    private final Logger log;
     private final String logPrefix;
-    private final Sensor closeTaskSensor;
-    private final boolean eosEnabled;
+    private final Logger log;
     private final InternalProcessorContext processorContext;
+    private final boolean eosEnabled;
+
+    // metrics
+    private final Sensor closeTaskSensor;
 
     private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
 
@@ -57,30 +59,31 @@ public class StandbyTask extends AbstractTask implements Task {
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
     StandbyTask(final TaskId id,
-                final Set<TopicPartition> partitions,
                 final ProcessorTopology topology,
-                final StreamsConfig config,
-                final StreamsMetricsImpl metrics,
-                final ProcessorStateManager stateMgr,
                 final StateDirectory stateDirectory,
+                final ProcessorStateManager stateMgr,
+                final Set<TopicPartition> partitions,
+                final StreamsConfig config,
+                final InternalProcessorContext processorContext,
                 final ThreadCache cache,
-                final InternalProcessorContext processorContext) {
+                final StreamsMetricsImpl metrics) {
         super(id, topology, stateDirectory, stateMgr, partitions);
-        this.processorContext = processorContext;
-        processorContext.transitionToStandby(cache);
 
+        // logging
         final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
-        logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id);
+        logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id.toString());
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
-        closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
+        // members
+        this.processorContext = processorContext;
+        processorContext.transitionToStandby(cache);
+
+        // config
         eosEnabled = StreamThread.eosEnabled(config);
-    }
 
-    @Override
-    public boolean isActive() {
-        return false;
+        // metrics
+        closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
     }
 
     /**
@@ -127,43 +130,6 @@ public class StandbyTask extends AbstractTask implements Task {
         log.trace("No-op resume with state {}", state());
     }
 
-    /**
-     * 1. flush store
-     * 2. write checkpoint file
-     *
-     * @throws StreamsException fatal error, should close the thread
-     */
-    @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            stateMgr.flush();
-            log.info("Task ready for committing");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
-        }
-
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public void postCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            // since there's no written offsets we can checkpoint with empty map,
-            // and the state current offset would be used to checkpoint
-            stateMgr.checkpoint(Collections.emptyMap());
-            offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
-            log.info("Finalized commit");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
-        }
-    }
-
-    @Override
-    public void closeClean() {
-        close(true);
-        log.info("Closed clean");
-    }
-
     @Override
     public void closeDirty() {
         close(false);
@@ -171,20 +137,9 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
-    public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            stateMgr.recycle();
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);
-        }
-
-        closeTaskSensor.record();
-        transitionTo(State.CLOSED);
-
-        log.info("Closed clean and recycled state");
+    public void closeClean() {
+        close(true);
+        log.info("Closed clean");
     }
 
     private void close(final boolean clean) {
@@ -225,14 +180,57 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
+    public void closeAndRecycleState() {
+        suspend();
+        prepareCommit();
+
+        if (state() == State.CREATED || state() == State.SUSPENDED) {
+            stateMgr.recycle();
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);
+        }
+
+        closeTaskSensor.record();
+        transitionTo(State.CLOSED);
+
+        log.info("Closed clean and recycled state");
+    }
+
+    @Override
     public boolean commitNeeded() {
         // we can commit if the store's offset has changed since last commit
         return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
     }
 
+    /**
+     * 1. flush store
+     * 2. write checkpoint file
+     *
+     * @throws StreamsException fatal error, should close the thread
+     */
     @Override
-    public Map<TopicPartition, Long> changelogOffsets() {
-        return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        if (state() == State.RUNNING || state() == State.SUSPENDED) {
+            stateMgr.flush();
+            log.info("Task ready for committing");
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
+        }
+
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void postCommit() {
+        if (state() == State.RUNNING || state() == State.SUSPENDED) {
+            // since there's no written offsets we can checkpoint with empty map,
+            // and the state current offset would be used to checkpoint
+            stateMgr.checkpoint(Collections.emptyMap());
+            offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+            log.info("Finalized commit");
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
+        }
     }
 
     @Override
@@ -240,6 +238,16 @@ public class StandbyTask extends AbstractTask implements Task {
         throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
     }
 
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> changelogOffsets() {
+        return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+    }
+
     InternalProcessorContext processorContext() {
         return processorContext;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 443db8e..d7a6586 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -94,7 +94,7 @@ class StandbyTaskCreator {
                     dummyCache
                 );
 
-                createdTasks.add(createStandbyTask(taskId, partitions, topology, stateManager, context));
+                createdTasks.add(createStandbyTask(taskId, topology, stateManager, partitions, context));
             } else {
                 log.trace(
                     "Skipped standby task {} with assigned partitions {} " +
@@ -117,28 +117,28 @@ class StandbyTaskCreator {
 
         return createStandbyTask(
             streamTask.id(),
-            partitions,
             builder.buildSubtopology(streamTask.id.topicGroupId),
             stateManager,
+            partitions,
             context
         );
     }
 
     StandbyTask createStandbyTask(final TaskId taskId,
-                                  final Set<TopicPartition> partitions,
                                   final ProcessorTopology topology,
                                   final ProcessorStateManager stateManager,
+                                  final Set<TopicPartition> partitions,
                                   final InternalProcessorContext context) {
         final StandbyTask task = new StandbyTask(
             taskId,
-            partitions,
             topology,
-            config,
-            streamsMetrics,
-            stateManager,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             dummyCache,
-            context
+            streamsMetrics
         );
 
         log.trace("Created task {} with assigned partitions {}", taskId, partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3925acc..147f5ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -71,26 +71,26 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     // visible for testing
     static final byte LATEST_MAGIC_BYTE = 1;
 
-    private final Time time;
-    private final Logger log;
     private final String logPrefix;
+    private final Logger log;
+
+    private final InternalProcessorContext processorContext;
+    private final Time time;
     private final Consumer<byte[], byte[]> mainConsumer;
+    private final RecordCollector recordCollector;
+    private final PartitionGroup.RecordInfo recordInfo;
+    private final RecordQueueCreator recordQueueCreator;
+    private final PartitionGroup partitionGroup;
+    private final PunctuationQueue streamTimePunctuationQueue;
+    private final PunctuationQueue systemTimePunctuationQueue;
+    private final Map<TopicPartition, Long> consumedOffsets;
 
     // we want to abstract eos logic out of StreamTask, however
     // there's still an optimization that requires this info to be
     // leaked into this class, which is to checkpoint after committing if EOS is not enabled.
     private final boolean eosEnabled;
-
     private final long maxTaskIdleMs;
     private final int maxBufferedSize;
-    private final PartitionGroup partitionGroup;
-    private final RecordCollector recordCollector;
-    private final PartitionGroup.RecordInfo recordInfo;
-    private final Map<TopicPartition, Long> consumedOffsets;
-    private final PunctuationQueue streamTimePunctuationQueue;
-    private final PunctuationQueue systemTimePunctuationQueue;
-
-    private long processTimeMs = 0L;
 
     private final Sensor closeTaskSensor;
     private final Sensor processRatioSensor;
@@ -99,56 +99,73 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private final Sensor bufferedRecordsSensor;
     private final Sensor enforcedProcessingSensor;
     private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
-    private final InternalProcessorContext processorContext;
-
-    private final RecordQueueCreator recordQueueCreator;
 
-    private long idleStartTimeMs;
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
-
+    private long idleStartTimeMs = RecordQueue.UNKNOWN;
+    private long processTimeMs = 0L;
     private Map<TopicPartition, Long> checkpoint = null;
 
     public StreamTask(final TaskId id,
-                      final Set<TopicPartition> partitions,
                       final ProcessorTopology topology,
-                      final Consumer<byte[], byte[]> mainConsumer,
-                      final StreamsConfig config,
-                      final StreamsMetricsImpl streamsMetrics,
                       final StateDirectory stateDirectory,
+                      final ProcessorStateManager stateMgr,
+                      final Set<TopicPartition> partitions,
+                      final StreamsConfig config,
+                      final InternalProcessorContext processorContext,
                       final ThreadCache cache,
+                      final StreamsMetricsImpl streamsMetrics,
                       final Time time,
-                      final ProcessorStateManager stateMgr,
-                      final RecordCollector recordCollector,
-                      final InternalProcessorContext processorContext) {
+                      final Consumer<byte[], byte[]> mainConsumer,
+                      final RecordCollector recordCollector) {
         super(id, topology, stateDirectory, stateMgr, partitions);
-        this.mainConsumer = mainConsumer;
 
-        this.processorContext = processorContext;
-        processorContext.transitionToActive(this, recordCollector, cache);
+        final String taskId = id.toString();
+        final String threadId = Thread.currentThread().getName();
 
-        final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
-        logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", id);
+        // logging
+        final String threadIdPrefix = String.format("stream-thread [%s] ", threadId);
+        logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", taskId);
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
+        // members
+        this.processorContext = processorContext;
+        processorContext.transitionToActive(this, recordCollector, cache);
         this.time = time;
+        this.mainConsumer = mainConsumer;
         this.recordCollector = recordCollector;
+
+        recordInfo = new PartitionGroup.RecordInfo();
+        recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
+        partitionGroup = new PartitionGroup(
+            createPartitionQueues(),
+            TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics)
+        );
+        streamTimePunctuationQueue = new PunctuationQueue();
+        systemTimePunctuationQueue = new PunctuationQueue();
+        consumedOffsets = new HashMap<>();
+
+        stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // config
         eosEnabled = StreamThread.eosEnabled(config);
+        maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+        maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
-        final String threadId = Thread.currentThread().getName();
+        // metrics
         closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
-        final String taskId = id.toString();
+        processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
+        processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
+        punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
+        bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+
         if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
             final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
             enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
         } else {
             enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
         }
-        processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
-        processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
-        punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
-        bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
 
         for (final String terminalNode : topology.terminalNodes()) {
             e2eLatencySensors.put(
@@ -164,22 +181,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics)
             );
         }
-
-        streamTimePunctuationQueue = new PunctuationQueue();
-        systemTimePunctuationQueue = new PunctuationQueue();
-        maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
-        maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-
-        // initialize the consumed and committed offset cache
-        consumedOffsets = new HashMap<>();
-
-        recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
-
-        recordInfo = new PartitionGroup.RecordInfo();
-        partitionGroup = new PartitionGroup(createPartitionQueues(),
-                TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));
-
-        stateMgr.registerGlobalStateStores(topology.globalStateStores());
     }
 
     // create queues for each assigned partition and associate them
@@ -192,11 +193,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return partitionQueues;
     }
 
-    @Override
-    public boolean isActive() {
-        return true;
-    }
-
     /**
      * @throws LockException    could happen when multi-threads within the single instance, could retry
      * @throws TimeoutException if initializing record collector timed out
@@ -246,6 +242,59 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    private void initializeMetadata() {
+        try {
+            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
+                .filter(e -> e.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            initializeTaskTime(offsetsAndMetadata);
+        } catch (final TimeoutException e) {
+            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
+                    "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
+                e.toString(),
+                ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+
+            throw e;
+        } catch (final KafkaException e) {
+            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
+        }
+    }
+
+    private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
+        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
+            final TopicPartition partition = entry.getKey();
+            final OffsetAndMetadata metadata = entry.getValue();
+
+            if (metadata != null) {
+                final long committedTimestamp = decodeTimestamp(metadata.metadata());
+                partitionGroup.setPartitionTime(partition, committedTimestamp);
+                log.debug("A committed timestamp was detected: setting the partition time of partition {}"
+                    + " to {} in stream task {}", partition, committedTimestamp, id);
+            } else {
+                log.debug("No committed timestamp was found in metadata for partition {}", partition);
+            }
+        }
+
+        final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
+        nonCommitted.removeAll(offsetsAndMetadata.keySet());
+        for (final TopicPartition partition : nonCommitted) {
+            log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
+        }
+    }
+
+    private void initializeTopology() {
+        // initialize the task by initializing all its processor nodes in the topology
+        log.trace("Initializing processor nodes of the topology");
+        for (final ProcessorNode<?, ?> node : topology.processors()) {
+            processorContext.setCurrentNode(node);
+            try {
+                node.init(processorContext);
+            } finally {
+                processorContext.setCurrentNode(null);
+            }
+        }
+    }
+
     @Override
     public void suspend() {
         switch (state()) {
@@ -333,296 +382,119 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    @Override
+    public void closeDirty() {
+        close(false);
+        log.info("Closed dirty");
+    }
+
+    @Override
+    public void closeClean() {
+        close(true);
+        log.info("Closed clean");
+    }
+
     /**
-     * @return offsets that should be committed for this task
+     * the following order must be followed:
+     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
+     *  2. then if we are closing on EOS and dirty, wipe out the state store directory
+     *  3. finally release the state manager lock
      */
-    @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+    private void close(final boolean clean) {
+        if (clean) {
+            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
+        }
+
         switch (state()) {
-            case RUNNING:
+            case CREATED:
             case RESTORING:
             case SUSPENDED:
-                maybeScheduleCheckpoint();
-                stateMgr.flush();
-                recordCollector.flush();
+                // first close state manager (which is idempotent) then close the record collector
+                // if the latter throws and we re-close dirty which would close the state manager again.
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> StateManagerUtil.closeStateManager(
+                        log,
+                        logPrefix,
+                        clean,
+                        eosEnabled,
+                        stateMgr,
+                        stateDirectory,
+                        TaskType.ACTIVE
+                    ),
+                    "state manager close",
+                    log);
 
-                log.debug("Prepared task for committing");
+                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
 
                 break;
 
-            case CREATED:
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");
+                log.trace("Skip closing since state is {}", state());
+                return;
+
+            case RUNNING:
+                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
 
             default:
-                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
+                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
         }
 
-        return committableOffsetsAndMetadata();
+        partitionGroup.clear();
+        closeTaskSensor.record();
+
+        transitionTo(State.CLOSED);
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
-        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
+    @Override
+    public void closeAndRecycleState() {
+        suspend();
+        prepareCommit();
+        writeCheckpointIfNeed();
 
         switch (state()) {
             case CREATED:
-            case RESTORING:
-                committableOffsets = Collections.emptyMap();
-
-                break;
-
-            case RUNNING:
             case SUSPENDED:
-                final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
-
-                committableOffsets = new HashMap<>(consumedOffsets.size());
-                for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
-                    final TopicPartition partition = entry.getKey();
-                    Long offset = partitionGroup.headRecordOffset(partition);
-                    if (offset == null) {
-                        try {
-                            offset = mainConsumer.position(partition);
-                        } catch (final TimeoutException error) {
-                            // the `consumer.position()` call should never block, because we know that we did process data
-                            // for the requested partition and thus the consumer should have a valid local position
-                            // that it can return immediately
-
-                            // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
-                            throw new IllegalStateException(error);
-                        } catch (final KafkaException fatal) {
-                            throw new StreamsException(fatal);
-                        }
-                    }
-                    final long partitionTime = partitionTimes.get(partition);
-                    committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
-                }
+                stateMgr.recycle();
+                recordCollector.close();
 
                 break;
 
+            case RESTORING: // we should have transitioned to `SUSPENDED` already
+            case RUNNING: // we should have transitioned to `SUSPENDED` already
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
-
+                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
             default:
-                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
+                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
         }
 
-        return committableOffsets;
-    }
-
-    @Override
-    public void postCommit() {
-        commitRequested = false;
-        commitNeeded = false;
+        partitionGroup.clear();
+        closeTaskSensor.record();
 
-        switch (state()) {
-            case RESTORING:
-                writeCheckpointIfNeed();
+        transitionTo(State.CLOSED);
 
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
-                }
-
-                break;
-
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
-        }
-
-        log.debug("Committed");
-    }
-
-    private Map<TopicPartition, Long> extractPartitionTimes() {
-        final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
-        for (final TopicPartition partition : partitionGroup.partitions()) {
-            partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
-        }
-        return partitionTimes;
-    }
-
-    @Override
-    public void closeClean() {
-        close(true);
-        log.info("Closed clean");
-    }
-
-    @Override
-    public void closeDirty() {
-        close(false);
-        log.info("Closed dirty");
-    }
-
-    @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
-        super.update(topicPartitions, nodeToSourceTopics);
-        partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
-    }
-
-    @Override
-    public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
-        switch (state()) {
-            case CREATED:
-            case SUSPENDED:
-                stateMgr.recycle();
-                recordCollector.close();
-
-                break;
-
-            case RESTORING: // we should have transitioned to `SUSPENDED` already
-            case RUNNING: // we should have transitioned to `SUSPENDED` already
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
-        }
-
-        partitionGroup.clear();
-        closeTaskSensor.record();
-
-        transitionTo(State.CLOSED);
-
-        log.info("Closed clean and recycled state");
-    }
-
-    private void maybeScheduleCheckpoint() {
-        switch (state()) {
-            case RESTORING:
-            case SUSPENDED:
-                this.checkpoint = checkpointableOffsets();
-
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) {
-                    this.checkpoint = checkpointableOffsets();
-                }
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
-        }
-    }
-
-    private void writeCheckpointIfNeed() {
-        if (commitNeeded) {
-            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
-        }
-        if (checkpoint != null) {
-            stateMgr.checkpoint(checkpoint);
-            checkpoint = null;
-        }
-    }
+        log.info("Closed clean and recycled state");
+    }
 
     /**
-     * <pre>
-     * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
-     *  2. then if we are closing on EOS and dirty, wipe out the state store directory
-     *  3. finally release the state manager lock
-     * </pre>
-     */
-    private void close(final boolean clean) {
-        if (clean) {
-            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
-        }
-
-        switch (state()) {
-            case CREATED:
-            case RESTORING:
-            case SUSPENDED:
-                // first close state manager (which is idempotent) then close the record collector
-                // if the latter throws and we re-close dirty which would close the state manager again.
-                executeAndMaybeSwallow(
-                    clean,
-                    () -> StateManagerUtil.closeStateManager(
-                        log,
-                        logPrefix,
-                        clean,
-                        eosEnabled,
-                        stateMgr,
-                        stateDirectory,
-                        TaskType.ACTIVE
-                    ),
-                    "state manager close",
-                    log);
-
-                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
-
-                break;
-
-            case CLOSED:
-                log.trace("Skip closing since state is {}", state());
-                return;
-
-            case RUNNING:
-                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
-        }
-
-        partitionGroup.clear();
-        closeTaskSensor.record();
-
-        transitionTo(State.CLOSED);
-    }
-
-    /**
-     * An active task is processable if its buffer contains data for all of its input
-     * source topic partitions, or if it is enforced to be processable
+     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
+     * and not added to the queue for processing
+     *
+     * @param partition the partition
+     * @param records   the records
      */
-    public boolean isProcessable(final long wallClockTime) {
-        if (state() == State.CLOSED) {
-            // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
-            // in either case we can just log it and move on without notifying the thread since the consumer
-            // would soon be updated to not return any records for this task anymore.
-            log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
+    @Override
+    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
+        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
-            return false;
+        if (log.isTraceEnabled()) {
+            log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
         }
 
-        if (partitionGroup.allPartitionsBuffered()) {
-            idleStartTimeMs = RecordQueue.UNKNOWN;
-            return true;
-        } else if (partitionGroup.numBuffered() > 0) {
-            if (idleStartTimeMs == RecordQueue.UNKNOWN) {
-                idleStartTimeMs = wallClockTime;
-            }
-
-            if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
-                enforcedProcessingSensor.record(1.0d, wallClockTime);
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            // there's no data in any of the topics; we should reset the enforced
-            // processing timer
-            idleStartTimeMs = RecordQueue.UNKNOWN;
-            return false;
+        // if after adding these records, its partition queue's buffered size has been
+        // increased beyond the threshold, we can then pause the consumption for this partition
+        if (newQueueSize > maxBufferedSize) {
+            mainConsumer.pause(singleton(partition));
         }
     }
 
@@ -673,13 +545,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         } catch (final RuntimeException e) {
             final String stackTrace = getStacktraceString(e);
             throw new StreamsException(String.format("Exception caught in process. taskId=%s, " +
-                                                  "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
-                                              id(),
-                                              processorContext.currentNode().name(),
-                                              record.topic(),
-                                              record.partition(),
-                                              record.offset(),
-                                              stackTrace
+                    "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
+                id(),
+                processorContext.currentNode().name(),
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                stackTrace
             ), e);
         } finally {
             processorContext.setCurrentNode(null);
@@ -688,6 +560,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return true;
     }
 
+    private String getStacktraceString(final RuntimeException e) {
+        String stacktrace = null;
+        try (final StringWriter stringWriter = new StringWriter();
+             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
+            e.printStackTrace(printWriter);
+            stacktrace = stringWriter.toString();
+        } catch (final IOException ioe) {
+            log.error("Encountered error extracting stacktrace from this exception", ioe);
+        }
+        return stacktrace;
+    }
+
     @Override
     public void recordProcessBatchTime(final long processBatchTime) {
         processTimeMs += processBatchTime;
@@ -700,16 +584,50 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         processTimeMs = 0L;
     }
 
-    private String getStacktraceString(final RuntimeException e) {
-        String stacktrace = null;
-        try (final StringWriter stringWriter = new StringWriter();
-             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
-            e.printStackTrace(printWriter);
-            stacktrace = stringWriter.toString();
-        } catch (final IOException ioe) {
-            log.error("Encountered error extracting stacktrace from this exception", ioe);
+    /**
+     * Possibly trigger registered stream-time punctuation functions if
+     * current partition group timestamp has reached the defined stamp
+     * Note, this is only called in the presence of new records
+     *
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    @Override
+    public boolean maybePunctuateStreamTime() {
+        final long streamTime = partitionGroup.streamTime();
+
+        // if the timestamp is not known yet, meaning there is not enough data accumulated
+        // to reason stream partition time, then skip.
+        if (streamTime == RecordQueue.UNKNOWN) {
+            return false;
+        } else {
+            final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
+
+            if (punctuated) {
+                commitNeeded = true;
+            }
+
+            return punctuated;
         }
-        return stacktrace;
+    }
+
+    /**
+     * Possibly trigger registered system-time punctuation functions if
+     * current system timestamp has reached the defined stamp
+     * Note, this is called irrespective of the presence of new records
+     *
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    @Override
+    public boolean maybePunctuateSystemTime() {
+        final long systemTime = time.milliseconds();
+
+        final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
+
+        if (punctuated) {
+            commitNeeded = true;
+        }
+
+        return punctuated;
     }
 
     /**
@@ -742,16 +660,69 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
-        processorContext.setRecordContext(
-            new ProcessorRecordContext(
-                record.timestamp,
-                record.offset(),
-                record.partition(),
-                record.topic(),
-                record.headers()));
-        processorContext.setCurrentNode(currNode);
-        processorContext.setSystemTimeMs(wallClockTime);
+    @Override
+    public boolean commitNeeded() {
+        return commitNeeded;
+    }
+
+    /**
+     * Whether or not a request has been made to commit the current state
+     */
+    @Override
+    public boolean commitRequested() {
+        return commitRequested;
+    }
+
+    /**
+     * @return offsets that should be committed for this task
+     */
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        switch (state()) {
+            case RUNNING:
+            case RESTORING:
+            case SUSPENDED:
+                maybeScheduleCheckpoint();
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.debug("Prepared task for committing");
+
+                break;
+
+            case CREATED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
+        }
+
+        return committableOffsetsAndMetadata();
+    }
+
+    private void maybeScheduleCheckpoint() {
+        switch (state()) {
+            case RESTORING:
+            case SUSPENDED:
+                this.checkpoint = checkpointableOffsets();
+
+                break;
+
+            case RUNNING:
+                if (!eosEnabled) {
+                    this.checkpoint = checkpointableOffsets();
+                }
+
+                break;
+
+            case CREATED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
+        }
     }
 
     /**
@@ -763,47 +734,100 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
             checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
         }
-        return checkpointableOffsets;
+        return checkpointableOffsets;
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
+        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
+
+        switch (state()) {
+            case CREATED:
+            case RESTORING:
+                committableOffsets = Collections.emptyMap();
+
+                break;
+
+            case RUNNING:
+            case SUSPENDED:
+                final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
+
+                committableOffsets = new HashMap<>(consumedOffsets.size());
+                for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+                    final TopicPartition partition = entry.getKey();
+                    Long offset = partitionGroup.headRecordOffset(partition);
+                    if (offset == null) {
+                        try {
+                            offset = mainConsumer.position(partition);
+                        } catch (final TimeoutException error) {
+                            // the `consumer.position()` call should never block, because we know that we did process data
+                            // for the requested partition and thus the consumer should have a valid local position
+                            // that it can return immediately
+
+                            // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
+                            throw new IllegalStateException(error);
+                        } catch (final KafkaException fatal) {
+                            throw new StreamsException(fatal);
+                        }
+                    }
+                    final long partitionTime = partitionTimes.get(partition);
+                    committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
+                }
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
+        }
+
+        return committableOffsets;
+    }
+
+    private Map<TopicPartition, Long> extractPartitionTimes() {
+        final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
+        for (final TopicPartition partition : partitionGroup.partitions()) {
+            partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
+        }
+        return partitionTimes;
     }
 
-    private void initializeMetadata() {
-        try {
-            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
-                    .filter(e -> e.getValue() != null)
-                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-            initializeTaskTime(offsetsAndMetadata);
-        } catch (final TimeoutException e) {
-            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
-                            "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
-                    e.toString(),
-                    ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+    @Override
+    public void postCommit() {
+        commitRequested = false;
+        commitNeeded = false;
 
-            throw e;
-        } catch (final KafkaException e) {
-            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
-        }
-    }
+        switch (state()) {
+            case RESTORING:
+                writeCheckpointIfNeed();
 
-    private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
-        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
-            final TopicPartition partition = entry.getKey();
-            final OffsetAndMetadata metadata = entry.getValue();
+                break;
 
-            if (metadata != null) {
-                final long committedTimestamp = decodeTimestamp(metadata.metadata());
-                partitionGroup.setPartitionTime(partition, committedTimestamp);
-                log.debug("A committed timestamp was detected: setting the partition time of partition {}"
-                    + " to {} in stream task {}", partition, committedTimestamp, id);
-            } else {
-                log.debug("No committed timestamp was found in metadata for partition {}", partition);
-            }
-        }
+            case RUNNING:
+                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
+                    writeCheckpointIfNeed();
+                }
 
-        final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
-        nonCommitted.removeAll(offsetsAndMetadata.keySet());
-        for (final TopicPartition partition : nonCommitted) {
-            log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
+                break;
+
+            case SUSPENDED:
+                writeCheckpointIfNeed();
+                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
+                // because otherwise we loose the partition-time information
+                partitionGroup.clear();
+
+                break;
+
+            case CREATED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
         }
+
+        log.debug("Committed");
     }
 
     @Override
@@ -819,38 +843,72 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return purgeableConsumedOffsets;
     }
 
-    private void initializeTopology() {
-        // initialize the task by initializing all its processor nodes in the topology
-        log.trace("Initializing processor nodes of the topology");
-        for (final ProcessorNode<?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
-            try {
-                node.init(processorContext);
-            } finally {
-                processorContext.setCurrentNode(null);
-            }
+    @Override
+    public boolean isActive() {
+        return true;
+    }
+
+    @Override
+    public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+        super.updateInputPartitions(topicPartitions, nodeToSourceTopics);
+        partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> changelogOffsets() {
+        if (state() == State.RUNNING) {
+            // if we are in running state, just return the latest offset sentinel indicating
+            // we should be at the end of the changelog
+            return changelogPartitions().stream()
+                .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
+        } else {
+            return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+        }
+    }
+
+    private void writeCheckpointIfNeed() {
+        if (commitNeeded) {
+            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
+        }
+        if (checkpoint != null) {
+            stateMgr.checkpoint(checkpoint);
+            checkpoint = null;
         }
     }
 
     /**
-     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
-     * and not added to the queue for processing
-     *
-     * @param partition the partition
-     * @param records   the records
+     * An active task is processable if its buffer contains data for all of its input
+     * source topic partitions, or if it is enforced to be processable
      */
-    @Override
-    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
+    public boolean isProcessable(final long wallClockTime) {
+        if (state() == State.CLOSED) {
+            // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
+            // in either case we can just log it and move on without notifying the thread since the consumer
+            // would soon be updated to not return any records for this task anymore.
+            log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
 
-        if (log.isTraceEnabled()) {
-            log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
+            return false;
         }
 
-        // if after adding these records, its partition queue's buffered size has been
-        // increased beyond the threshold, we can then pause the consumption for this partition
-        if (newQueueSize > maxBufferedSize) {
-            mainConsumer.pause(singleton(partition));
+        if (partitionGroup.allPartitionsBuffered()) {
+            idleStartTimeMs = RecordQueue.UNKNOWN;
+            return true;
+        } else if (partitionGroup.numBuffered() > 0) {
+            if (idleStartTimeMs == RecordQueue.UNKNOWN) {
+                idleStartTimeMs = wallClockTime;
+            }
+
+            if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
+                enforcedProcessingSensor.record(1.0d, wallClockTime);
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            // there's no data in any of the topics; we should reset the enforced
+            // processing timer
+            idleStartTimeMs = RecordQueue.UNKNOWN;
+            return false;
         }
     }
 
@@ -902,48 +960,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    /**
-     * Possibly trigger registered stream-time punctuation functions if
-     * current partition group timestamp has reached the defined stamp
-     * Note, this is only called in the presence of new records
-     *
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
-    public boolean maybePunctuateStreamTime() {
-        final long streamTime = partitionGroup.streamTime();
-
-        // if the timestamp is not known yet, meaning there is not enough data accumulated
-        // to reason stream partition time, then skip.
-        if (streamTime == RecordQueue.UNKNOWN) {
-            return false;
-        } else {
-            final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
-
-            if (punctuated) {
-                commitNeeded = true;
-            }
-
-            return punctuated;
-        }
+    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
+        processorContext.setRecordContext(
+            new ProcessorRecordContext(
+                record.timestamp,
+                record.offset(),
+                record.partition(),
+                record.topic(),
+                record.headers()));
+        processorContext.setCurrentNode(currNode);
+        processorContext.setSystemTimeMs(wallClockTime);
     }
 
     /**
-     * Possibly trigger registered system-time punctuation functions if
-     * current system timestamp has reached the defined stamp
-     * Note, this is called irrespective of the presence of new records
-     *
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     * Request committing the current task's state
      */
-    public boolean maybePunctuateSystemTime() {
-        final long systemTime = time.milliseconds();
-
-        final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
-
-        if (punctuated) {
-            commitNeeded = true;
-        }
-
-        return punctuated;
+    void requestCommit() {
+        commitRequested = true;
     }
 
     void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
@@ -959,21 +992,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    /**
-     * Request committing the current task's state
-     */
-    void requestCommit() {
-        commitRequested = true;
+    public InternalProcessorContext processorContext() {
+        return processorContext;
     }
 
-    /**
-     * Whether or not a request has been made to commit the current state
-     */
-    @Override
-    public boolean commitRequested() {
-        return commitRequested;
+    public boolean hasRecordsQueued() {
+        return numBuffered() > 0;
     }
 
+    // visible for testing
     static String encodeTimestamp(final long partitionTime) {
         final ByteBuffer buffer = ByteBuffer.allocate(9);
         buffer.put(LATEST_MAGIC_BYTE);
@@ -981,6 +1008,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return Base64.getEncoder().encodeToString(buffer.array());
     }
 
+    // visible for testing
     long decodeTimestamp(final String encryptedString) {
         if (encryptedString.isEmpty()) {
             return RecordQueue.UNKNOWN;
@@ -992,15 +1020,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 return buffer.getLong();
             default:
                 log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
-                         LATEST_MAGIC_BYTE, version);
+                    LATEST_MAGIC_BYTE, version);
                 return RecordQueue.UNKNOWN;
         }
     }
 
-    public InternalProcessorContext processorContext() {
-        return processorContext;
+
+
+    // for testing only
+
+    RecordCollector recordCollector() {
+        return recordCollector;
+    }
+
+    int numBuffered() {
+        return partitionGroup.numBuffered();
+    }
+
+    long streamTime() {
+        return partitionGroup.streamTime();
     }
 
+
+
     /**
      * Produces a string representation containing useful information about a Task.
      * This is useful in debugging scenarios.
@@ -1043,40 +1085,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return sb.toString();
     }
 
-    @Override
-    public boolean commitNeeded() {
-        return commitNeeded;
-    }
-
-    @Override
-    public Map<TopicPartition, Long> changelogOffsets() {
-        if (state() == State.RUNNING) {
-            // if we are in running state, just return the latest offset sentinel indicating
-            // we should be at the end of the changelog
-            return changelogPartitions().stream()
-                                        .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
-        } else {
-            return Collections.unmodifiableMap(stateMgr.changelogOffsets());
-        }
-    }
-
-    public boolean hasRecordsQueued() {
-        return numBuffered() > 0;
-    }
-
-    RecordCollector recordCollector() {
-        return recordCollector;
-    }
-
-    // below are visible for testing only
-    int numBuffered() {
-        return partitionGroup.numBuffered();
-    }
-
-    long streamTime() {
-        return partitionGroup.streamTime();
-    }
-
     private class RecordQueueCreator {
         private final LogContext logContext;
         private final TimestampExtractor defaultTimestampExtractor;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 62332c7..2032c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -29,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -97,13 +97,9 @@ public interface Task {
         }
     }
 
-    TaskId id();
 
-    State state();
 
-    boolean isActive();
-
-    boolean isClosed();
+    // idempotent life-cycle methods
 
     /**
      * @throws LockException could happen when multi-threads within the single instance, could retry
@@ -116,89 +112,100 @@ public interface Task {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods
+
+    StateStore getStore(final String name);
+
+    /**
+     * @return the offsets of all the changelog partitions associated with this task,
+     *         indicating the current positions of the logged state stores of the task.
+     */
+    Map<TopicPartition, Long> changelogOffsets();
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index fdbaf1c..4d2948a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -359,7 +359,7 @@ public class TaskManager {
             for (final TopicPartition topicPartition : topicPartitions) {
                 partitionToTask.put(topicPartition, task);
             }
-            task.update(topicPartitions, builder.nodeToSourceTopics());
+            task.updateInputPartitions(topicPartitions, builder.nodeToSourceTopics());
         }
         task.resume();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8784cf1..667d16b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -479,14 +479,15 @@ public class StandbyTaskTest {
 
         return new StandbyTask(
             taskId,
-            Collections.singleton(partition),
             topology,
-            config,
-            streamsMetrics,
-            stateManager,
             stateDirectory,
+            stateManager,
+            Collections.singleton(partition),
+            config,
+            context,
             cache,
-            context);
+            streamsMetrics
+        );
     }
 
     private MetricName setupCloseTaskMetric() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8f8bb54..030b630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.HashSet;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -74,6 +73,7 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1452,17 +1452,18 @@ public class StreamTaskTest {
 
         task = new StreamTask(
             taskId,
-            mkSet(partition1, repartition),
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            mkSet(partition1, repartition),
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1777,7 +1778,7 @@ public class StreamTaskTest {
         final Set<TopicPartition> newPartitions = new HashSet<>(task.inputPartitions());
         newPartitions.add(new TopicPartition("newTopic", 0));
 
-        task.update(newPartitions, mkMap(
+        task.updateInputPartitions(newPartitions, mkMap(
             mkEntry(source1.name(), asList(topic1, "newTopic")),
             mkEntry(source2.name(), singletonList(topic2)))
         );
@@ -1828,17 +1829,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            mkSet(partition1),
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            mkSet(partition1),
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createDisconnectedTask(final StreamsConfig config) {
@@ -1867,17 +1869,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createFaultyStatefulTask(final StreamsConfig config) {
@@ -1897,17 +1900,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
@@ -1933,17 +1937,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createStatelessTask(final StreamsConfig config,
@@ -1972,17 +1977,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion),
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion),
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private ConsumerRecord<byte[], byte[]> getConsumerRecord(final TopicPartition topicPartition, final long offset) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8be3c21..0c01b5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2706,7 +2706,8 @@ public class TaskManagerTest {
         }
 
         @Override
-        public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+        public void updateInputPartitions(final Set<TopicPartition> topicPartitions,
+                                          final Map<String, List<String>> nodeToSourceTopics) {
             inputPartitions = topicPartitions;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a171a46..81ba22d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -390,17 +390,18 @@ public class StreamThreadStateStoreProviderTest {
         );
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            clientSupplier.consumer,
-            streamsConfig,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            streamsConfig,
+            context,
             EasyMock.createNiceMock(ThreadCache.class),
+            streamsMetrics,
             new MockTime(),
-            stateManager,
-            recordCollector,
-            context);
+            clientSupplier.consumer,
+            recordCollector
+        );
     }
 
     private void mockThread(final boolean initialized) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2ca187b..82d892f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -501,17 +501,17 @@ public class TopologyTestDriver implements Closeable {
 
             task = new StreamTask(
                 TASK_ID,
-                new HashSet<>(partitionsByInputTopic.values()),
                 processorTopology,
-                consumer,
-                streamsConfig,
-                streamsMetrics,
                 stateDirectory,
+                stateManager,
+                new HashSet<>(partitionsByInputTopic.values()),
+                streamsConfig,
+                context,
                 cache,
+                streamsMetrics,
                 mockWallClockTime,
-                stateManager,
-                recordCollector,
-                context
+                consumer,
+                recordCollector
             );
             task.initializeIfNeeded();
             task.completeRestoration();