You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/11 04:04:15 UTC

[kafka] 01/01: MINOR: code cleanup for Kafka Streams task classes

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-9441-refactor-tm
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1966d4363e91f5045b014ce375ce0a58ee2a5a79
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jun 10 20:05:25 2020 -0700

    MINOR: code cleanup for Kafka Streams task classes
---
 .../streams/processor/internals/AbstractTask.java  |  63 +-
 .../processor/internals/ActiveTaskCreator.java     |  14 +-
 .../streams/processor/internals/StandbyTask.java   | 146 ++--
 .../processor/internals/StandbyTaskCreator.java    |  16 +-
 .../streams/processor/internals/StreamTask.java    | 968 +++++++++++----------
 .../kafka/streams/processor/internals/Task.java    | 111 +--
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/StandbyTaskTest.java       |  11 +-
 .../processor/internals/StreamTaskTest.java        |  94 +-
 .../processor/internals/TaskManagerTest.java       |   3 +-
 .../StreamThreadStateStoreProviderTest.java        |  15 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  14 +-
 12 files changed, 743 insertions(+), 714 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index f59571b..8c342a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
     private Task.State state = CREATED;
-    protected Set<TopicPartition> inputPartitions;
-    protected ProcessorTopology topology;
 
     protected final TaskId id;
+    protected final ProcessorTopology topology;
     protected final StateDirectory stateDirectory;
     protected final ProcessorStateManager stateMgr;
 
+    protected Set<TopicPartition> inputPartitions;
+
     AbstractTask(final TaskId id,
                  final ProcessorTopology topology,
                  final StateDirectory stateDirectory,
                  final ProcessorStateManager stateMgr,
                  final Set<TopicPartition> inputPartitions) {
         this.id = id;
-        this.stateMgr = stateMgr;
         this.topology = topology;
-        this.inputPartitions = inputPartitions;
+        this.stateMgr = stateMgr;
         this.stateDirectory = stateDirectory;
+        this.inputPartitions = inputPartitions;
     }
 
     @Override
-    public TaskId id() {
-        return id;
+    public void revive() {
+        if (state == CLOSED) {
+            transitionTo(CREATED);
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
+        }
     }
 
     @Override
-    public Set<TopicPartition> inputPartitions() {
-        return inputPartitions;
+    public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
+        stateMgr.markChangelogAsCorrupted(partitions);
     }
 
     @Override
-    public Collection<TopicPartition> changelogPartitions() {
-        return stateMgr.changelogPartitions();
+    public final TaskId id() {
+        return id;
     }
 
     @Override
-    public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
-        stateMgr.markChangelogAsCorrupted(partitions);
+    public final Task.State state() {
+        return state;
     }
 
     @Override
-    public StateStore getStore(final String name) {
-        return stateMgr.getStore(name);
+    public void updateInputPartitions(final Set<TopicPartition> topicPartitions,
+                                      final Map<String, List<String>> nodeToSourceTopics) {
+        this.inputPartitions = topicPartitions;
+        topology.updateSourceTopics(nodeToSourceTopics);
     }
 
     @Override
-    public boolean isClosed() {
-        return state() == State.CLOSED;
+    public Set<TopicPartition> inputPartitions() {
+        return inputPartitions;
     }
 
     @Override
-    public final Task.State state() {
-        return state;
+    public StateStore getStore(final String name) {
+        return stateMgr.getStore(name);
     }
 
     @Override
-    public void revive() {
-        if (state == CLOSED) {
-            transitionTo(CREATED);
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
-        }
+    public Collection<TopicPartition> changelogPartitions() {
+        return stateMgr.changelogPartitions();
     }
 
     final void transitionTo(final Task.State newState) {
@@ -118,10 +121,4 @@ public abstract class AbstractTask implements Task {
             }
         }
     }
-
-    @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
-        this.inputPartitions = topicPartitions;
-        topology.updateSourceTopics(nodeToSourceTopics);
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 0a1f47e..4d2dc5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -230,17 +230,17 @@ class ActiveTaskCreator {
 
         final StreamTask task = new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context
+            consumer,
+            recordCollector
         );
 
         log.trace("Created task {} with assigned partitions {}", taskId, partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 9c80fd7..90f0aeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -39,11 +39,13 @@ import java.util.Set;
  * A StandbyTask
  */
 public class StandbyTask extends AbstractTask implements Task {
-    private final Logger log;
     private final String logPrefix;
-    private final Sensor closeTaskSensor;
-    private final boolean eosEnabled;
+    private final Logger log;
     private final InternalProcessorContext processorContext;
+    private final boolean eosEnabled;
+
+    // metrics
+    private final Sensor closeTaskSensor;
 
     private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
 
@@ -57,30 +59,31 @@ public class StandbyTask extends AbstractTask implements Task {
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
     StandbyTask(final TaskId id,
-                final Set<TopicPartition> partitions,
                 final ProcessorTopology topology,
-                final StreamsConfig config,
-                final StreamsMetricsImpl metrics,
-                final ProcessorStateManager stateMgr,
                 final StateDirectory stateDirectory,
+                final ProcessorStateManager stateMgr,
+                final Set<TopicPartition> partitions,
+                final StreamsConfig config,
+                final InternalProcessorContext processorContext,
                 final ThreadCache cache,
-                final InternalProcessorContext processorContext) {
+                final StreamsMetricsImpl metrics) {
         super(id, topology, stateDirectory, stateMgr, partitions);
-        this.processorContext = processorContext;
-        processorContext.transitionToStandby(cache);
 
+        // logging
         final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
-        logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id);
+        logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id.toString());
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
-        closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
+        // members
+        this.processorContext = processorContext;
+        processorContext.transitionToStandby(cache);
+
+        // config
         eosEnabled = StreamThread.eosEnabled(config);
-    }
 
-    @Override
-    public boolean isActive() {
-        return false;
+        // metrics
+        closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
     }
 
     /**
@@ -127,43 +130,6 @@ public class StandbyTask extends AbstractTask implements Task {
         log.trace("No-op resume with state {}", state());
     }
 
-    /**
-     * 1. flush store
-     * 2. write checkpoint file
-     *
-     * @throws StreamsException fatal error, should close the thread
-     */
-    @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            stateMgr.flush();
-            log.info("Task ready for committing");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
-        }
-
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public void postCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            // since there's no written offsets we can checkpoint with empty map,
-            // and the state current offset would be used to checkpoint
-            stateMgr.checkpoint(Collections.emptyMap());
-            offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
-            log.info("Finalized commit");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
-        }
-    }
-
-    @Override
-    public void closeClean() {
-        close(true);
-        log.info("Closed clean");
-    }
-
     @Override
     public void closeDirty() {
         close(false);
@@ -171,20 +137,9 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
-    public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            stateMgr.recycle();
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);
-        }
-
-        closeTaskSensor.record();
-        transitionTo(State.CLOSED);
-
-        log.info("Closed clean and recycled state");
+    public void closeClean() {
+        close(true);
+        log.info("Closed clean");
     }
 
     private void close(final boolean clean) {
@@ -225,14 +180,57 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
+    public void closeAndRecycleState() {
+        suspend();
+        prepareCommit();
+
+        if (state() == State.CREATED || state() == State.SUSPENDED) {
+            stateMgr.recycle();
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);
+        }
+
+        closeTaskSensor.record();
+        transitionTo(State.CLOSED);
+
+        log.info("Closed clean and recycled state");
+    }
+
+    @Override
     public boolean commitNeeded() {
         // we can commit if the store's offset has changed since last commit
         return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
     }
 
+    /**
+     * 1. flush store
+     * 2. write checkpoint file
+     *
+     * @throws StreamsException fatal error, should close the thread
+     */
     @Override
-    public Map<TopicPartition, Long> changelogOffsets() {
-        return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        if (state() == State.RUNNING || state() == State.SUSPENDED) {
+            stateMgr.flush();
+            log.info("Task ready for committing");
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
+        }
+
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void postCommit() {
+        if (state() == State.RUNNING || state() == State.SUSPENDED) {
+            // since there's no written offsets we can checkpoint with empty map,
+            // and the state current offset would be used to checkpoint
+            stateMgr.checkpoint(Collections.emptyMap());
+            offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+            log.info("Finalized commit");
+        } else {
+            throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
+        }
     }
 
     @Override
@@ -240,6 +238,16 @@ public class StandbyTask extends AbstractTask implements Task {
         throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
     }
 
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> changelogOffsets() {
+        return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+    }
+
     InternalProcessorContext processorContext() {
         return processorContext;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 443db8e..d7a6586 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -94,7 +94,7 @@ class StandbyTaskCreator {
                     dummyCache
                 );
 
-                createdTasks.add(createStandbyTask(taskId, partitions, topology, stateManager, context));
+                createdTasks.add(createStandbyTask(taskId, topology, stateManager, partitions, context));
             } else {
                 log.trace(
                     "Skipped standby task {} with assigned partitions {} " +
@@ -117,28 +117,28 @@ class StandbyTaskCreator {
 
         return createStandbyTask(
             streamTask.id(),
-            partitions,
             builder.buildSubtopology(streamTask.id.topicGroupId),
             stateManager,
+            partitions,
             context
         );
     }
 
     StandbyTask createStandbyTask(final TaskId taskId,
-                                  final Set<TopicPartition> partitions,
                                   final ProcessorTopology topology,
                                   final ProcessorStateManager stateManager,
+                                  final Set<TopicPartition> partitions,
                                   final InternalProcessorContext context) {
         final StandbyTask task = new StandbyTask(
             taskId,
-            partitions,
             topology,
-            config,
-            streamsMetrics,
-            stateManager,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             dummyCache,
-            context
+            streamsMetrics
         );
 
         log.trace("Created task {} with assigned partitions {}", taskId, partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3925acc..147f5ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -71,26 +71,26 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     // visible for testing
     static final byte LATEST_MAGIC_BYTE = 1;
 
-    private final Time time;
-    private final Logger log;
     private final String logPrefix;
+    private final Logger log;
+
+    private final InternalProcessorContext processorContext;
+    private final Time time;
     private final Consumer<byte[], byte[]> mainConsumer;
+    private final RecordCollector recordCollector;
+    private final PartitionGroup.RecordInfo recordInfo;
+    private final RecordQueueCreator recordQueueCreator;
+    private final PartitionGroup partitionGroup;
+    private final PunctuationQueue streamTimePunctuationQueue;
+    private final PunctuationQueue systemTimePunctuationQueue;
+    private final Map<TopicPartition, Long> consumedOffsets;
 
     // we want to abstract eos logic out of StreamTask, however
     // there's still an optimization that requires this info to be
     // leaked into this class, which is to checkpoint after committing if EOS is not enabled.
     private final boolean eosEnabled;
-
     private final long maxTaskIdleMs;
     private final int maxBufferedSize;
-    private final PartitionGroup partitionGroup;
-    private final RecordCollector recordCollector;
-    private final PartitionGroup.RecordInfo recordInfo;
-    private final Map<TopicPartition, Long> consumedOffsets;
-    private final PunctuationQueue streamTimePunctuationQueue;
-    private final PunctuationQueue systemTimePunctuationQueue;
-
-    private long processTimeMs = 0L;
 
     private final Sensor closeTaskSensor;
     private final Sensor processRatioSensor;
@@ -99,56 +99,73 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private final Sensor bufferedRecordsSensor;
     private final Sensor enforcedProcessingSensor;
     private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
-    private final InternalProcessorContext processorContext;
-
-    private final RecordQueueCreator recordQueueCreator;
 
-    private long idleStartTimeMs;
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
-
+    private long idleStartTimeMs = RecordQueue.UNKNOWN;
+    private long processTimeMs = 0L;
     private Map<TopicPartition, Long> checkpoint = null;
 
     public StreamTask(final TaskId id,
-                      final Set<TopicPartition> partitions,
                       final ProcessorTopology topology,
-                      final Consumer<byte[], byte[]> mainConsumer,
-                      final StreamsConfig config,
-                      final StreamsMetricsImpl streamsMetrics,
                       final StateDirectory stateDirectory,
+                      final ProcessorStateManager stateMgr,
+                      final Set<TopicPartition> partitions,
+                      final StreamsConfig config,
+                      final InternalProcessorContext processorContext,
                       final ThreadCache cache,
+                      final StreamsMetricsImpl streamsMetrics,
                       final Time time,
-                      final ProcessorStateManager stateMgr,
-                      final RecordCollector recordCollector,
-                      final InternalProcessorContext processorContext) {
+                      final Consumer<byte[], byte[]> mainConsumer,
+                      final RecordCollector recordCollector) {
         super(id, topology, stateDirectory, stateMgr, partitions);
-        this.mainConsumer = mainConsumer;
 
-        this.processorContext = processorContext;
-        processorContext.transitionToActive(this, recordCollector, cache);
+        final String taskId = id.toString();
+        final String threadId = Thread.currentThread().getName();
 
-        final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
-        logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", id);
+        // logging
+        final String threadIdPrefix = String.format("stream-thread [%s] ", threadId);
+        logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", taskId);
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
+        // members
+        this.processorContext = processorContext;
+        processorContext.transitionToActive(this, recordCollector, cache);
         this.time = time;
+        this.mainConsumer = mainConsumer;
         this.recordCollector = recordCollector;
+
+        recordInfo = new PartitionGroup.RecordInfo();
+        recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
+        partitionGroup = new PartitionGroup(
+            createPartitionQueues(),
+            TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics)
+        );
+        streamTimePunctuationQueue = new PunctuationQueue();
+        systemTimePunctuationQueue = new PunctuationQueue();
+        consumedOffsets = new HashMap<>();
+
+        stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // config
         eosEnabled = StreamThread.eosEnabled(config);
+        maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+        maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
-        final String threadId = Thread.currentThread().getName();
+        // metrics
         closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
-        final String taskId = id.toString();
+        processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
+        processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
+        punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
+        bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+
         if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
             final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
             enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
         } else {
             enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
         }
-        processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
-        processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
-        punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
-        bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
 
         for (final String terminalNode : topology.terminalNodes()) {
             e2eLatencySensors.put(
@@ -164,22 +181,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics)
             );
         }
-
-        streamTimePunctuationQueue = new PunctuationQueue();
-        systemTimePunctuationQueue = new PunctuationQueue();
-        maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
-        maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-
-        // initialize the consumed and committed offset cache
-        consumedOffsets = new HashMap<>();
-
-        recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
-
-        recordInfo = new PartitionGroup.RecordInfo();
-        partitionGroup = new PartitionGroup(createPartitionQueues(),
-                TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));
-
-        stateMgr.registerGlobalStateStores(topology.globalStateStores());
     }
 
     // create queues for each assigned partition and associate them
@@ -192,11 +193,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return partitionQueues;
     }
 
-    @Override
-    public boolean isActive() {
-        return true;
-    }
-
     /**
      * @throws LockException    could happen when multi-threads within the single instance, could retry
      * @throws TimeoutException if initializing record collector timed out
@@ -246,6 +242,59 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    private void initializeMetadata() {
+        try {
+            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
+                .filter(e -> e.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            initializeTaskTime(offsetsAndMetadata);
+        } catch (final TimeoutException e) {
+            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
+                    "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
+                e.toString(),
+                ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+
+            throw e;
+        } catch (final KafkaException e) {
+            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
+        }
+    }
+
+    private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
+        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
+            final TopicPartition partition = entry.getKey();
+            final OffsetAndMetadata metadata = entry.getValue();
+
+            if (metadata != null) {
+                final long committedTimestamp = decodeTimestamp(metadata.metadata());
+                partitionGroup.setPartitionTime(partition, committedTimestamp);
+                log.debug("A committed timestamp was detected: setting the partition time of partition {}"
+                    + " to {} in stream task {}", partition, committedTimestamp, id);
+            } else {
+                log.debug("No committed timestamp was found in metadata for partition {}", partition);
+            }
+        }
+
+        final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
+        nonCommitted.removeAll(offsetsAndMetadata.keySet());
+        for (final TopicPartition partition : nonCommitted) {
+            log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
+        }
+    }
+
+    private void initializeTopology() {
+        // initialize the task by initializing all its processor nodes in the topology
+        log.trace("Initializing processor nodes of the topology");
+        for (final ProcessorNode<?, ?> node : topology.processors()) {
+            processorContext.setCurrentNode(node);
+            try {
+                node.init(processorContext);
+            } finally {
+                processorContext.setCurrentNode(null);
+            }
+        }
+    }
+
     @Override
     public void suspend() {
         switch (state()) {
@@ -333,296 +382,119 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    @Override
+    public void closeDirty() {
+        close(false);
+        log.info("Closed dirty");
+    }
+
+    @Override
+    public void closeClean() {
+        close(true);
+        log.info("Closed clean");
+    }
+
     /**
-     * @return offsets that should be committed for this task
+     * the following order must be followed:
+     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
+     *  2. then if we are closing on EOS and dirty, wipe out the state store directory
+     *  3. finally release the state manager lock
      */
-    @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+    private void close(final boolean clean) {
+        if (clean) {
+            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
+        }
+
         switch (state()) {
-            case RUNNING:
+            case CREATED:
             case RESTORING:
             case SUSPENDED:
-                maybeScheduleCheckpoint();
-                stateMgr.flush();
-                recordCollector.flush();
+                // first close state manager (which is idempotent) then close the record collector
+                // if the latter throws and we re-close dirty which would close the state manager again.
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> StateManagerUtil.closeStateManager(
+                        log,
+                        logPrefix,
+                        clean,
+                        eosEnabled,
+                        stateMgr,
+                        stateDirectory,
+                        TaskType.ACTIVE
+                    ),
+                    "state manager close",
+                    log);
 
-                log.debug("Prepared task for committing");
+                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
 
                 break;
 
-            case CREATED:
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");
+                log.trace("Skip closing since state is {}", state());
+                return;
+
+            case RUNNING:
+                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
 
             default:
-                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
+                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
         }
 
-        return committableOffsetsAndMetadata();
+        partitionGroup.clear();
+        closeTaskSensor.record();
+
+        transitionTo(State.CLOSED);
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
-        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
+    @Override
+    public void closeAndRecycleState() {
+        suspend();
+        prepareCommit();
+        writeCheckpointIfNeed();
 
         switch (state()) {
             case CREATED:
-            case RESTORING:
-                committableOffsets = Collections.emptyMap();
-
-                break;
-
-            case RUNNING:
             case SUSPENDED:
-                final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
-
-                committableOffsets = new HashMap<>(consumedOffsets.size());
-                for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
-                    final TopicPartition partition = entry.getKey();
-                    Long offset = partitionGroup.headRecordOffset(partition);
-                    if (offset == null) {
-                        try {
-                            offset = mainConsumer.position(partition);
-                        } catch (final TimeoutException error) {
-                            // the `consumer.position()` call should never block, because we know that we did process data
-                            // for the requested partition and thus the consumer should have a valid local position
-                            // that it can return immediately
-
-                            // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
-                            throw new IllegalStateException(error);
-                        } catch (final KafkaException fatal) {
-                            throw new StreamsException(fatal);
-                        }
-                    }
-                    final long partitionTime = partitionTimes.get(partition);
-                    committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
-                }
+                stateMgr.recycle();
+                recordCollector.close();
 
                 break;
 
+            case RESTORING: // we should have transitioned to `SUSPENDED` already
+            case RUNNING: // we should have transitioned to `SUSPENDED` already
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
-
+                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
             default:
-                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
+                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
         }
 
-        return committableOffsets;
-    }
-
-    @Override
-    public void postCommit() {
-        commitRequested = false;
-        commitNeeded = false;
+        partitionGroup.clear();
+        closeTaskSensor.record();
 
-        switch (state()) {
-            case RESTORING:
-                writeCheckpointIfNeed();
+        transitionTo(State.CLOSED);
 
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
-                }
-
-                break;
-
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
-        }
-
-        log.debug("Committed");
-    }
-
-    private Map<TopicPartition, Long> extractPartitionTimes() {
-        final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
-        for (final TopicPartition partition : partitionGroup.partitions()) {
-            partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
-        }
-        return partitionTimes;
-    }
-
-    @Override
-    public void closeClean() {
-        close(true);
-        log.info("Closed clean");
-    }
-
-    @Override
-    public void closeDirty() {
-        close(false);
-        log.info("Closed dirty");
-    }
-
-    @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
-        super.update(topicPartitions, nodeToSourceTopics);
-        partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
-    }
-
-    @Override
-    public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
-        switch (state()) {
-            case CREATED:
-            case SUSPENDED:
-                stateMgr.recycle();
-                recordCollector.close();
-
-                break;
-
-            case RESTORING: // we should have transitioned to `SUSPENDED` already
-            case RUNNING: // we should have transitioned to `SUSPENDED` already
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
-        }
-
-        partitionGroup.clear();
-        closeTaskSensor.record();
-
-        transitionTo(State.CLOSED);
-
-        log.info("Closed clean and recycled state");
-    }
-
-    private void maybeScheduleCheckpoint() {
-        switch (state()) {
-            case RESTORING:
-            case SUSPENDED:
-                this.checkpoint = checkpointableOffsets();
-
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) {
-                    this.checkpoint = checkpointableOffsets();
-                }
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
-        }
-    }
-
-    private void writeCheckpointIfNeed() {
-        if (commitNeeded) {
-            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
-        }
-        if (checkpoint != null) {
-            stateMgr.checkpoint(checkpoint);
-            checkpoint = null;
-        }
-    }
+        log.info("Closed clean and recycled state");
+    }
 
     /**
-     * <pre>
-     * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
-     *  2. then if we are closing on EOS and dirty, wipe out the state store directory
-     *  3. finally release the state manager lock
-     * </pre>
-     */
-    private void close(final boolean clean) {
-        if (clean) {
-            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
-        }
-
-        switch (state()) {
-            case CREATED:
-            case RESTORING:
-            case SUSPENDED:
-                // first close state manager (which is idempotent) then close the record collector
-                // if the latter throws and we re-close dirty which would close the state manager again.
-                executeAndMaybeSwallow(
-                    clean,
-                    () -> StateManagerUtil.closeStateManager(
-                        log,
-                        logPrefix,
-                        clean,
-                        eosEnabled,
-                        stateMgr,
-                        stateDirectory,
-                        TaskType.ACTIVE
-                    ),
-                    "state manager close",
-                    log);
-
-                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
-
-                break;
-
-            case CLOSED:
-                log.trace("Skip closing since state is {}", state());
-                return;
-
-            case RUNNING:
-                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
-        }
-
-        partitionGroup.clear();
-        closeTaskSensor.record();
-
-        transitionTo(State.CLOSED);
-    }
-
-    /**
-     * An active task is processable if its buffer contains data for all of its input
-     * source topic partitions, or if it is enforced to be processable
+     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
+     * and not added to the queue for processing
+     *
+     * @param partition the partition
+     * @param records   the records
      */
-    public boolean isProcessable(final long wallClockTime) {
-        if (state() == State.CLOSED) {
-            // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
-            // in either case we can just log it and move on without notifying the thread since the consumer
-            // would soon be updated to not return any records for this task anymore.
-            log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
+    @Override
+    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
+        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
-            return false;
+        if (log.isTraceEnabled()) {
+            log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
         }
 
-        if (partitionGroup.allPartitionsBuffered()) {
-            idleStartTimeMs = RecordQueue.UNKNOWN;
-            return true;
-        } else if (partitionGroup.numBuffered() > 0) {
-            if (idleStartTimeMs == RecordQueue.UNKNOWN) {
-                idleStartTimeMs = wallClockTime;
-            }
-
-            if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
-                enforcedProcessingSensor.record(1.0d, wallClockTime);
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            // there's no data in any of the topics; we should reset the enforced
-            // processing timer
-            idleStartTimeMs = RecordQueue.UNKNOWN;
-            return false;
+        // if after adding these records, its partition queue's buffered size has been
+        // increased beyond the threshold, we can then pause the consumption for this partition
+        if (newQueueSize > maxBufferedSize) {
+            mainConsumer.pause(singleton(partition));
         }
     }
 
@@ -673,13 +545,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         } catch (final RuntimeException e) {
             final String stackTrace = getStacktraceString(e);
             throw new StreamsException(String.format("Exception caught in process. taskId=%s, " +
-                                                  "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
-                                              id(),
-                                              processorContext.currentNode().name(),
-                                              record.topic(),
-                                              record.partition(),
-                                              record.offset(),
-                                              stackTrace
+                    "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
+                id(),
+                processorContext.currentNode().name(),
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                stackTrace
             ), e);
         } finally {
             processorContext.setCurrentNode(null);
@@ -688,6 +560,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return true;
     }
 
+    private String getStacktraceString(final RuntimeException e) {
+        String stacktrace = null;
+        try (final StringWriter stringWriter = new StringWriter();
+             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
+            e.printStackTrace(printWriter);
+            stacktrace = stringWriter.toString();
+        } catch (final IOException ioe) {
+            log.error("Encountered error extracting stacktrace from this exception", ioe);
+        }
+        return stacktrace;
+    }
+
     @Override
     public void recordProcessBatchTime(final long processBatchTime) {
         processTimeMs += processBatchTime;
@@ -700,16 +584,50 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         processTimeMs = 0L;
     }
 
-    private String getStacktraceString(final RuntimeException e) {
-        String stacktrace = null;
-        try (final StringWriter stringWriter = new StringWriter();
-             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
-            e.printStackTrace(printWriter);
-            stacktrace = stringWriter.toString();
-        } catch (final IOException ioe) {
-            log.error("Encountered error extracting stacktrace from this exception", ioe);
+    /**
+     * Possibly trigger registered stream-time punctuation functions if
+     * current partition group timestamp has reached the defined stamp
+     * Note, this is only called in the presence of new records
+     *
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    @Override
+    public boolean maybePunctuateStreamTime() {
+        final long streamTime = partitionGroup.streamTime();
+
+        // if the timestamp is not known yet, meaning there is not enough data accumulated
+        // to reason stream partition time, then skip.
+        if (streamTime == RecordQueue.UNKNOWN) {
+            return false;
+        } else {
+            final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
+
+            if (punctuated) {
+                commitNeeded = true;
+            }
+
+            return punctuated;
         }
-        return stacktrace;
+    }
+
+    /**
+     * Possibly trigger registered system-time punctuation functions if
+     * current system timestamp has reached the defined stamp
+     * Note, this is called irrespective of the presence of new records
+     *
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    @Override
+    public boolean maybePunctuateSystemTime() {
+        final long systemTime = time.milliseconds();
+
+        final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
+
+        if (punctuated) {
+            commitNeeded = true;
+        }
+
+        return punctuated;
     }
 
     /**
@@ -742,16 +660,69 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
-        processorContext.setRecordContext(
-            new ProcessorRecordContext(
-                record.timestamp,
-                record.offset(),
-                record.partition(),
-                record.topic(),
-                record.headers()));
-        processorContext.setCurrentNode(currNode);
-        processorContext.setSystemTimeMs(wallClockTime);
+    @Override
+    public boolean commitNeeded() {
+        return commitNeeded;
+    }
+
+    /**
+     * Whether or not a request has been made to commit the current state
+     */
+    @Override
+    public boolean commitRequested() {
+        return commitRequested;
+    }
+
+    /**
+     * @return offsets that should be committed for this task
+     */
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        switch (state()) {
+            case RUNNING:
+            case RESTORING:
+            case SUSPENDED:
+                maybeScheduleCheckpoint();
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.debug("Prepared task for committing");
+
+                break;
+
+            case CREATED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
+        }
+
+        return committableOffsetsAndMetadata();
+    }
+
+    private void maybeScheduleCheckpoint() {
+        switch (state()) {
+            case RESTORING:
+            case SUSPENDED:
+                this.checkpoint = checkpointableOffsets();
+
+                break;
+
+            case RUNNING:
+                if (!eosEnabled) {
+                    this.checkpoint = checkpointableOffsets();
+                }
+
+                break;
+
+            case CREATED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
+        }
     }
 
     /**
@@ -763,47 +734,100 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
             checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
         }
-        return checkpointableOffsets;
+        return checkpointableOffsets;
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
+        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
+
+        switch (state()) {
+            case CREATED:
+            case RESTORING:
+                committableOffsets = Collections.emptyMap();
+
+                break;
+
+            case RUNNING:
+            case SUSPENDED:
+                final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
+
+                committableOffsets = new HashMap<>(consumedOffsets.size());
+                for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+                    final TopicPartition partition = entry.getKey();
+                    Long offset = partitionGroup.headRecordOffset(partition);
+                    if (offset == null) {
+                        try {
+                            offset = mainConsumer.position(partition);
+                        } catch (final TimeoutException error) {
+                            // the `consumer.position()` call should never block, because we know that we did process data
+                            // for the requested partition and thus the consumer should have a valid local position
+                            // that it can return immediately
+
+                            // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
+                            throw new IllegalStateException(error);
+                        } catch (final KafkaException fatal) {
+                            throw new StreamsException(fatal);
+                        }
+                    }
+                    final long partitionTime = partitionTimes.get(partition);
+                    committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
+                }
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
+        }
+
+        return committableOffsets;
+    }
+
+    private Map<TopicPartition, Long> extractPartitionTimes() {
+        final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
+        for (final TopicPartition partition : partitionGroup.partitions()) {
+            partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
+        }
+        return partitionTimes;
     }
 
-    private void initializeMetadata() {
-        try {
-            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
-                    .filter(e -> e.getValue() != null)
-                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-            initializeTaskTime(offsetsAndMetadata);
-        } catch (final TimeoutException e) {
-            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
-                            "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
-                    e.toString(),
-                    ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+    @Override
+    public void postCommit() {
+        commitRequested = false;
+        commitNeeded = false;
 
-            throw e;
-        } catch (final KafkaException e) {
-            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
-        }
-    }
+        switch (state()) {
+            case RESTORING:
+                writeCheckpointIfNeed();
 
-    private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
-        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
-            final TopicPartition partition = entry.getKey();
-            final OffsetAndMetadata metadata = entry.getValue();
+                break;
 
-            if (metadata != null) {
-                final long committedTimestamp = decodeTimestamp(metadata.metadata());
-                partitionGroup.setPartitionTime(partition, committedTimestamp);
-                log.debug("A committed timestamp was detected: setting the partition time of partition {}"
-                    + " to {} in stream task {}", partition, committedTimestamp, id);
-            } else {
-                log.debug("No committed timestamp was found in metadata for partition {}", partition);
-            }
-        }
+            case RUNNING:
+                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
+                    writeCheckpointIfNeed();
+                }
 
-        final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
-        nonCommitted.removeAll(offsetsAndMetadata.keySet());
-        for (final TopicPartition partition : nonCommitted) {
-            log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
+                break;
+
+            case SUSPENDED:
+                writeCheckpointIfNeed();
+                // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
+                // because otherwise we loose the partition-time information
+                partitionGroup.clear();
+
+                break;
+
+            case CREATED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
         }
+
+        log.debug("Committed");
     }
 
     @Override
@@ -819,38 +843,72 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return purgeableConsumedOffsets;
     }
 
-    private void initializeTopology() {
-        // initialize the task by initializing all its processor nodes in the topology
-        log.trace("Initializing processor nodes of the topology");
-        for (final ProcessorNode<?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
-            try {
-                node.init(processorContext);
-            } finally {
-                processorContext.setCurrentNode(null);
-            }
+    @Override
+    public boolean isActive() {
+        return true;
+    }
+
+    @Override
+    public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+        super.updateInputPartitions(topicPartitions, nodeToSourceTopics);
+        partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> changelogOffsets() {
+        if (state() == State.RUNNING) {
+            // if we are in running state, just return the latest offset sentinel indicating
+            // we should be at the end of the changelog
+            return changelogPartitions().stream()
+                .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
+        } else {
+            return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+        }
+    }
+
+    private void writeCheckpointIfNeed() {
+        if (commitNeeded) {
+            throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
+        }
+        if (checkpoint != null) {
+            stateMgr.checkpoint(checkpoint);
+            checkpoint = null;
         }
     }
 
     /**
-     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
-     * and not added to the queue for processing
-     *
-     * @param partition the partition
-     * @param records   the records
+     * An active task is processable if its buffer contains data for all of its input
+     * source topic partitions, or if it is enforced to be processable
      */
-    @Override
-    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
+    public boolean isProcessable(final long wallClockTime) {
+        if (state() == State.CLOSED) {
+            // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
+            // in either case we can just log it and move on without notifying the thread since the consumer
+            // would soon be updated to not return any records for this task anymore.
+            log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
 
-        if (log.isTraceEnabled()) {
-            log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
+            return false;
         }
 
-        // if after adding these records, its partition queue's buffered size has been
-        // increased beyond the threshold, we can then pause the consumption for this partition
-        if (newQueueSize > maxBufferedSize) {
-            mainConsumer.pause(singleton(partition));
+        if (partitionGroup.allPartitionsBuffered()) {
+            idleStartTimeMs = RecordQueue.UNKNOWN;
+            return true;
+        } else if (partitionGroup.numBuffered() > 0) {
+            if (idleStartTimeMs == RecordQueue.UNKNOWN) {
+                idleStartTimeMs = wallClockTime;
+            }
+
+            if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
+                enforcedProcessingSensor.record(1.0d, wallClockTime);
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            // there's no data in any of the topics; we should reset the enforced
+            // processing timer
+            idleStartTimeMs = RecordQueue.UNKNOWN;
+            return false;
         }
     }
 
@@ -902,48 +960,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    /**
-     * Possibly trigger registered stream-time punctuation functions if
-     * current partition group timestamp has reached the defined stamp
-     * Note, this is only called in the presence of new records
-     *
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
-    public boolean maybePunctuateStreamTime() {
-        final long streamTime = partitionGroup.streamTime();
-
-        // if the timestamp is not known yet, meaning there is not enough data accumulated
-        // to reason stream partition time, then skip.
-        if (streamTime == RecordQueue.UNKNOWN) {
-            return false;
-        } else {
-            final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
-
-            if (punctuated) {
-                commitNeeded = true;
-            }
-
-            return punctuated;
-        }
+    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
+        processorContext.setRecordContext(
+            new ProcessorRecordContext(
+                record.timestamp,
+                record.offset(),
+                record.partition(),
+                record.topic(),
+                record.headers()));
+        processorContext.setCurrentNode(currNode);
+        processorContext.setSystemTimeMs(wallClockTime);
     }
 
     /**
-     * Possibly trigger registered system-time punctuation functions if
-     * current system timestamp has reached the defined stamp
-     * Note, this is called irrespective of the presence of new records
-     *
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     * Request committing the current task's state
      */
-    public boolean maybePunctuateSystemTime() {
-        final long systemTime = time.milliseconds();
-
-        final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
-
-        if (punctuated) {
-            commitNeeded = true;
-        }
-
-        return punctuated;
+    void requestCommit() {
+        commitRequested = true;
     }
 
     void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
@@ -959,21 +992,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    /**
-     * Request committing the current task's state
-     */
-    void requestCommit() {
-        commitRequested = true;
+    public InternalProcessorContext processorContext() {
+        return processorContext;
     }
 
-    /**
-     * Whether or not a request has been made to commit the current state
-     */
-    @Override
-    public boolean commitRequested() {
-        return commitRequested;
+    public boolean hasRecordsQueued() {
+        return numBuffered() > 0;
     }
 
+    // visible for testing
     static String encodeTimestamp(final long partitionTime) {
         final ByteBuffer buffer = ByteBuffer.allocate(9);
         buffer.put(LATEST_MAGIC_BYTE);
@@ -981,6 +1008,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return Base64.getEncoder().encodeToString(buffer.array());
     }
 
+    // visible for testing
     long decodeTimestamp(final String encryptedString) {
         if (encryptedString.isEmpty()) {
             return RecordQueue.UNKNOWN;
@@ -992,15 +1020,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 return buffer.getLong();
             default:
                 log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
-                         LATEST_MAGIC_BYTE, version);
+                    LATEST_MAGIC_BYTE, version);
                 return RecordQueue.UNKNOWN;
         }
     }
 
-    public InternalProcessorContext processorContext() {
-        return processorContext;
+
+
+    // for testing only
+
+    RecordCollector recordCollector() {
+        return recordCollector;
+    }
+
+    int numBuffered() {
+        return partitionGroup.numBuffered();
+    }
+
+    long streamTime() {
+        return partitionGroup.streamTime();
     }
 
+
+
     /**
      * Produces a string representation containing useful information about a Task.
      * This is useful in debugging scenarios.
@@ -1043,40 +1085,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return sb.toString();
     }
 
-    @Override
-    public boolean commitNeeded() {
-        return commitNeeded;
-    }
-
-    @Override
-    public Map<TopicPartition, Long> changelogOffsets() {
-        if (state() == State.RUNNING) {
-            // if we are in running state, just return the latest offset sentinel indicating
-            // we should be at the end of the changelog
-            return changelogPartitions().stream()
-                                        .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
-        } else {
-            return Collections.unmodifiableMap(stateMgr.changelogOffsets());
-        }
-    }
-
-    public boolean hasRecordsQueued() {
-        return numBuffered() > 0;
-    }
-
-    RecordCollector recordCollector() {
-        return recordCollector;
-    }
-
-    // below are visible for testing only
-    int numBuffered() {
-        return partitionGroup.numBuffered();
-    }
-
-    long streamTime() {
-        return partitionGroup.streamTime();
-    }
-
     private class RecordQueueCreator {
         private final LogContext logContext;
         private final TimestampExtractor defaultTimestampExtractor;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 62332c7..2032c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -29,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -97,13 +97,9 @@ public interface Task {
         }
     }
 
-    TaskId id();
 
-    State state();
 
-    boolean isActive();
-
-    boolean isClosed();
+    // idempotent life-cycle methods
 
     /**
      * @throws LockException could happen when multi-threads within the single instance, could retry
@@ -116,89 +112,100 @@ public interface Task {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods
+
+    StateStore getStore(final String name);
+
+    /**
+     * @return the offsets of all the changelog partitions associated with this task,
+     *         indicating the current positions of the logged state stores of the task.
+     */
+    Map<TopicPartition, Long> changelogOffsets();
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index fdbaf1c..4d2948a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -359,7 +359,7 @@ public class TaskManager {
             for (final TopicPartition topicPartition : topicPartitions) {
                 partitionToTask.put(topicPartition, task);
             }
-            task.update(topicPartitions, builder.nodeToSourceTopics());
+            task.updateInputPartitions(topicPartitions, builder.nodeToSourceTopics());
         }
         task.resume();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8784cf1..667d16b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -479,14 +479,15 @@ public class StandbyTaskTest {
 
         return new StandbyTask(
             taskId,
-            Collections.singleton(partition),
             topology,
-            config,
-            streamsMetrics,
-            stateManager,
             stateDirectory,
+            stateManager,
+            Collections.singleton(partition),
+            config,
+            context,
             cache,
-            context);
+            streamsMetrics
+        );
     }
 
     private MetricName setupCloseTaskMetric() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8f8bb54..030b630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.HashSet;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -74,6 +73,7 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1452,17 +1452,18 @@ public class StreamTaskTest {
 
         task = new StreamTask(
             taskId,
-            mkSet(partition1, repartition),
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            mkSet(partition1, repartition),
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1777,7 +1778,7 @@ public class StreamTaskTest {
         final Set<TopicPartition> newPartitions = new HashSet<>(task.inputPartitions());
         newPartitions.add(new TopicPartition("newTopic", 0));
 
-        task.update(newPartitions, mkMap(
+        task.updateInputPartitions(newPartitions, mkMap(
             mkEntry(source1.name(), asList(topic1, "newTopic")),
             mkEntry(source2.name(), singletonList(topic2)))
         );
@@ -1828,17 +1829,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            mkSet(partition1),
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            mkSet(partition1),
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createDisconnectedTask(final StreamsConfig config) {
@@ -1867,17 +1869,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createFaultyStatefulTask(final StreamsConfig config) {
@@ -1897,17 +1900,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
@@ -1933,17 +1937,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            streamsMetrics,
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private StreamTask createStatelessTask(final StreamsConfig config,
@@ -1972,17 +1977,18 @@ public class StreamTaskTest {
 
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            consumer,
-            config,
-            new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion),
             stateDirectory,
+            stateManager,
+            partitions,
+            config,
+            context,
             cache,
+            new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion),
             time,
-            stateManager,
-            recordCollector,
-            context);
+            consumer,
+            recordCollector
+        );
     }
 
     private ConsumerRecord<byte[], byte[]> getConsumerRecord(final TopicPartition topicPartition, final long offset) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8be3c21..0c01b5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2706,7 +2706,8 @@ public class TaskManagerTest {
         }
 
         @Override
-        public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+        public void updateInputPartitions(final Set<TopicPartition> topicPartitions,
+                                          final Map<String, List<String>> nodeToSourceTopics) {
             inputPartitions = topicPartitions;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a171a46..81ba22d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -390,17 +390,18 @@ public class StreamThreadStateStoreProviderTest {
         );
         return new StreamTask(
             taskId,
-            partitions,
             topology,
-            clientSupplier.consumer,
-            streamsConfig,
-            streamsMetrics,
             stateDirectory,
+            stateManager,
+            partitions,
+            streamsConfig,
+            context,
             EasyMock.createNiceMock(ThreadCache.class),
+            streamsMetrics,
             new MockTime(),
-            stateManager,
-            recordCollector,
-            context);
+            clientSupplier.consumer,
+            recordCollector
+        );
     }
 
     private void mockThread(final boolean initialized) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2ca187b..82d892f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -501,17 +501,17 @@ public class TopologyTestDriver implements Closeable {
 
             task = new StreamTask(
                 TASK_ID,
-                new HashSet<>(partitionsByInputTopic.values()),
                 processorTopology,
-                consumer,
-                streamsConfig,
-                streamsMetrics,
                 stateDirectory,
+                stateManager,
+                new HashSet<>(partitionsByInputTopic.values()),
+                streamsConfig,
+                context,
                 cache,
+                streamsMetrics,
                 mockWallClockTime,
-                stateManager,
-                recordCollector,
-                context
+                consumer,
+                recordCollector
             );
             task.initializeIfNeeded();
             task.completeRestoration();