You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/06/16 14:06:17 UTC

[kafka] branch trunk updated: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7ed3748a46 KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)
7ed3748a46 is described below

commit 7ed3748a462cd1ce7c30bb9a331b94a3cd79a401
Author: James Hughes <jn...@gmail.com>
AuthorDate: Thu Jun 16 10:06:02 2022 -0400

    KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)
    
    This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies (KIP-834).
    
    Co-authored-by: Bruno Cadonna <ca...@apache.org>
    
    Reviewers: Bonnie Varghese <bv...@confluent.io>, Walker Carlson <wc...@confluent.io>, Guozhang Wang <gu...@apache.org>, Bruno Cadonna <ca...@apache.org>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |  48 ++-
 .../processor/internals/ActiveTaskCreator.java     |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  55 ++-
 .../streams/processor/internals/StreamThread.java  |   3 +-
 .../processor/internals/TaskExecutionMetadata.java |  14 +-
 .../streams/processor/internals/TaskExecutor.java  |   2 +-
 .../streams/processor/internals/TaskManager.java   |   7 +-
 .../kafka/streams/processor/internals/Tasks.java   |  24 +-
 .../processor/internals/TopologyMetadata.java      |  36 +-
 .../KafkaStreamsNamedTopologyWrapper.java          |  25 ++
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  64 +++-
 .../integration/PauseResumeIntegrationTest.java    | 422 +++++++++++++++++++++
 .../integration/utils/IntegrationTestUtils.java    |  35 ++
 .../internals/StoreChangelogReaderTest.java        |   2 +-
 .../processor/internals/StreamThreadTest.java      | 154 ++++++--
 .../internals/TaskExecutionMetadataTest.java       | 124 ++++++
 .../processor/internals/TaskExecutorTest.java      |  41 ++
 .../streams/processor/internals/TasksTest.java     |  65 ++++
 .../processor/internals/TopologyMetadataTest.java  |  49 +++
 20 files changed, 1125 insertions(+), 49 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f2c24541a1..033017f0fe 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -225,7 +225,7 @@
               files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
 
     <suppress checks="JavaNCSS"
-              files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
+              files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
 
     <suppress checks="NPathComplexity"
               files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ed62f89ebc..86ab83f67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -56,7 +56,6 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -65,6 +64,7 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryConfig;
@@ -111,6 +111,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
 
 /**
  * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -1735,6 +1736,51 @@ public class KafkaStreams implements AutoCloseable {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit offsets.
+     *  This method sets transient state that is not maintained or managed among instances.
+     *  Note that pause() can be called before start() in order to start a KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the consumers are started up.
+     */
+    public void pause() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
+                topologyMetadata.pauseTopology(namedTopology.name());
+            }
+        } else {
+            topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * @return true when the KafkaStreams instance has its processing paused.
+     */
+    public boolean isPaused() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            return topologyMetadata.getAllNamedTopologies().stream()
+                .map(NamedTopology::name)
+                .allMatch(topologyMetadata::isPaused);
+        } else {
+            return topologyMetadata.isPaused(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * This method resumes processing for the KafkaStreams instance.
+     */
+    public void resume() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
+                topologyMetadata.resumeTopology(namedTopology.name());
+            }
+        } else {
+            topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
+        }
+    }
+
     /**
      * handle each stream thread in a snapshot of threads.
      * noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index d90266dbf4..e7832df6b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -151,7 +151,7 @@ class ActiveTaskCreator {
     }
 
     // TODO: change return type to `StreamTask`
-    Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
+    public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
                                  final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
         // TODO: change type to `StreamTask`
         final List<Task> createdTasks = new ArrayList<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 756bf11b0a..5c4f0b12e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.slf4j.Logger;
 
 import java.time.Duration;
@@ -429,6 +430,8 @@ public class StoreChangelogReader implements ChangelogReader {
             final ConsumerRecords<byte[], byte[]> polledRecords;
 
             try {
+                pauseResumePartitions(tasks, restoringChangelogs);
+
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
@@ -463,7 +466,10 @@ public class StoreChangelogReader implements ChangelogReader {
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                 try {
                     if (restoreChangelog(changelogs.get(partition))) {
-                        tasks.get(taskId).clearTaskTimeout();
+                        final Task task = tasks.get(taskId);
+                        if (task != null) {
+                            task.clearTaskTimeout();
+                        }
                     }
                 } catch (final TimeoutException timeoutException) {
                     tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
@@ -479,6 +485,47 @@ public class StoreChangelogReader implements ChangelogReader {
         }
     }
 
+    private void pauseResumePartitions(final Map<TaskId, Task> tasks,
+        final Set<TopicPartition> restoringChangelogs) {
+        if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+            updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE);
+        }
+        if (state == ChangelogReaderState.STANDBY_UPDATING) {
+            updatePartitionsByType(tasks, restoringChangelogs, TaskType.STANDBY);
+        }
+    }
+
+    private void updatePartitionsByType(final Map<TaskId, Task> tasks,
+                                        final Set<TopicPartition> restoringChangelogs,
+                                        final TaskType taskType) {
+        final Collection<TopicPartition> toResume =
+            restoringChangelogs.stream().filter(t -> shouldResume(tasks, t, taskType)).collect(Collectors.toList());
+        final Collection<TopicPartition> toPause =
+            restoringChangelogs.stream().filter(t -> shouldPause(tasks, t, taskType)).collect(Collectors.toList());
+        restoreConsumer.resume(toResume);
+        restoreConsumer.pause(toPause);
+    }
+
+    private boolean shouldResume(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
+        final ProcessorStateManager manager = changelogs.get(partition).stateManager;
+        final TaskId taskId = manager.taskId();
+        final Task task = tasks.get(taskId);
+        if (manager.taskType() == taskType) {
+            return task != null;
+        }
+        return false;
+    }
+
+    private boolean shouldPause(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
+        final ProcessorStateManager manager = changelogs.get(partition).stateManager;
+        final TaskId taskId = manager.taskId();
+        final Task task = tasks.get(taskId);
+        if (manager.taskType() == taskType) {
+            return task == null;
+        }
+        return false;
+    }
+
     private void maybeLogRestorationProgress() {
         if (state == ChangelogReaderState.ACTIVE_RESTORING) {
             if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
@@ -633,7 +680,11 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void clearTaskTimeout(final Set<Task> tasks) {
-        tasks.forEach(Task::clearTaskTimeout);
+        tasks.forEach(t -> {
+            if (t != null) {
+                t.clearTaskTimeout();
+            }
+        });
     }
 
     private void maybeInitTaskTimeoutOrThrow(final Set<Task> tasks,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 15e4903e63..7d3be3b1a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -897,7 +897,8 @@ public class StreamThread extends Thread {
         }
         // we can always let changelog reader try restoring in order to initialize the changelogs;
         // if there's no active restoring or standby updating it would not try to fetch any data
-        changelogReader.restore(taskManager.tasks());
+        // After KAFKA-13873, we only restore the not paused tasks.
+        changelogReader.restore(taskManager.notPausedTasks());
         log.debug("Idempotent restore call done. Thread state has not changed.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
index f3422537f9..310cdef66e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -35,21 +35,27 @@ public class TaskExecutionMetadata {
     private static final long CONSTANT_BACKOFF_MS = 5_000L;
 
     private final boolean hasNamedTopologies;
+    private final Set<String> pausedTopologies;
     // map of topologies experiencing errors/currently under backoff
     private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
 
-    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+    public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) {
         this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+        this.pausedTopologies = pausedTopologies;
     }
 
     public boolean canProcessTask(final Task task, final long now) {
         final String topologyName = task.id().topologyName();
         if (!hasNamedTopologies) {
             // TODO implement error handling/backoff for non-named topologies (needs KIP)
-            return true;
+            return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
         } else {
-            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
-            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+            if (pausedTopologies.contains(topologyName)) {
+                return false;
+            } else {
+                final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+                return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+            }
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index cad03fbd1b..bab8a75149 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -275,7 +275,7 @@ public class TaskExecutor {
     int punctuate() {
         int punctuated = 0;
 
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.notPausedActiveTasks()) {
             try {
                 if (task.maybePunctuateStreamTime()) {
                     punctuated++;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4bc8e4335c..2d13085962 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -115,7 +115,7 @@ public class TaskManager {
         final LogContext logContext = new LogContext(logPrefix);
         this.log = logContext.logger(getClass());
 
-        this.tasks = new Tasks(logContext, topologyMetadata,  streamsMetrics, activeTaskCreator, standbyTaskCreator);
+        this.tasks = new Tasks(logContext, topologyMetadata, activeTaskCreator, standbyTaskCreator);
         this.taskExecutor = new TaskExecutor(
             tasks,
             topologyMetadata.taskExecutionMetadata(),
@@ -1018,6 +1018,11 @@ public class TaskManager {
         return tasks.tasksPerId();
     }
 
+    Map<TaskId, Task> notPausedTasks() {
+        return Collections.unmodifiableMap(tasks.notPausedTasks().stream()
+            .collect(Collectors.toMap(Task::id, v -> v)));
+    }
+
     Map<TaskId, Task> activeTaskMap() {
         return activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index c4aec35d4e..ca5481b67b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -22,14 +22,13 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-
-import java.util.HashSet;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -38,9 +37,8 @@ import java.util.stream.Collectors;
 class Tasks {
     private final Logger log;
     private final TopologyMetadata topologyMetadata;
-    private final StreamsMetricsImpl streamsMetrics;
 
-    private final Map<TaskId, Task> allTasksPerId = new TreeMap<>();
+    private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
     private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
     private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());
 
@@ -68,14 +66,12 @@ class Tasks {
 
     Tasks(final LogContext logContext,
           final TopologyMetadata topologyMetadata,
-          final StreamsMetricsImpl streamsMetrics,
           final ActiveTaskCreator activeTaskCreator,
           final StandbyTaskCreator standbyTaskCreator) {
 
         log = logContext.logger(getClass());
 
         this.topologyMetadata = topologyMetadata;
-        this.streamsMetrics = streamsMetrics;
         this.activeTaskCreator = activeTaskCreator;
         this.standbyTaskCreator = standbyTaskCreator;
     }
@@ -273,6 +269,20 @@ class Tasks {
         return readOnlyTasks;
     }
 
+    Collection<Task> notPausedActiveTasks() {
+        return new ArrayList<>(readOnlyActiveTasks)
+            .stream()
+            .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
+            .collect(Collectors.toList());
+    }
+
+    Collection<Task> notPausedTasks() {
+        return new ArrayList<>(readOnlyTasks)
+            .stream()
+            .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
+            .collect(Collectors.toList());
+    }
+
     Set<TaskId> activeTaskIds() {
         return readOnlyActiveTaskIds;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 3c0710a847..e3c057b9a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -73,6 +73,7 @@ public class TopologyMetadata {
     private final ProcessingMode processingMode;
     private final TopologyVersion version;
     private final TaskExecutionMetadata taskExecutionMetadata;
+    private final Set<String> pausedTopologies;
 
     private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
 
@@ -104,6 +105,7 @@ public class TopologyMetadata {
         this.processingMode = StreamsConfigUtils.processingMode(config);
         this.config = config;
         this.log = LoggerFactory.getLogger(getClass());
+        this.pausedTopologies = ConcurrentHashMap.newKeySet();
 
         builders = new ConcurrentSkipListMap<>();
         if (builder.hasNamedTopology()) {
@@ -111,7 +113,7 @@ public class TopologyMetadata {
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
-        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
+        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies);
     }
 
     public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders,
@@ -120,12 +122,13 @@ public class TopologyMetadata {
         this.processingMode = StreamsConfigUtils.processingMode(config);
         this.config = config;
         this.log = LoggerFactory.getLogger(getClass());
+        this.pausedTopologies = ConcurrentHashMap.newKeySet();
 
         this.builders = builders;
         if (builders.isEmpty()) {
             log.info("Created an empty KafkaStreams app with no topology");
         }
-        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
+        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies);
     }
 
     // Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix
@@ -257,6 +260,35 @@ public class TopologyMetadata {
         }
     }
 
+    /**
+     * Pauses a topology by name
+     * @param topologyName Name of the topology to pause
+     */
+    public void pauseTopology(final String topologyName) {
+        pausedTopologies.add(topologyName);
+    }
+
+    /**
+     * Checks if a given topology is paused.
+     * @param topologyName If null, assume that we are checking the `UNNAMED_TOPOLOGY`.
+     * @return A boolean indicating if the topology is paused.
+     */
+    public boolean isPaused(final String topologyName) {
+        if (topologyName == null) {
+            return pausedTopologies.contains(UNNAMED_TOPOLOGY);
+        } else {
+            return pausedTopologies.contains(topologyName);
+        }
+    }
+
+    /**
+     * Resumes a topology by name
+     * @param topologyName Name of the topology to resume
+     */
+    public void resumeTopology(final String topologyName) {
+        pausedTopologies.remove(topologyName);
+    }
+
     /**
      * Removes the topology and registers a future that listens for all threads on the older version to see the update
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 0e29f4d694..3d22c58337 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -253,6 +253,31 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
         }
     }
 
+    /**
+     * Pauses a topology by name
+     * @param topologyName Name of the topology to pause
+     */
+    public void pauseNamedTopology(final String topologyName) {
+        topologyMetadata.pauseTopology(topologyName);
+    }
+
+    /**
+     * Checks if a given topology is paused.
+     * @param topologyName If null, assume that we are checking the `UNNAMED_TOPOLOGY`.
+     * @return A boolean indicating if the topology is paused.
+     */
+    public boolean isNamedTopologyPaused(final String topologyName) {
+        return topologyMetadata.isPaused(topologyName);
+    }
+
+    /**
+     * Resumes a topology by name
+     * @param topologyName Name of the topology to resume
+     */
+    public void resumeNamedTopology(final String topologyName) {
+        topologyMetadata.resumeTopology(topologyName);
+    }
+
     /**
      * @return  true iff the application is still in CREATED and the future was completed
      */
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 95574a16ca..b0f0647731 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -55,8 +55,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -91,19 +91,18 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ExecutionException;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
-
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyLong;
@@ -603,6 +602,45 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void testPauseResume() {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            streams.pause();
+            Assert.assertTrue(streams.isPaused());
+            streams.resume();
+            Assert.assertFalse(streams.isPaused());
+        }
+    }
+
+    @Test
+    public void testStartingPaused() {
+        // This test shows that a KafkaStreams instance can be started "paused"
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.pause();
+            streams.start();
+            Assert.assertTrue(streams.isPaused());
+            streams.resume();
+            Assert.assertFalse(streams.isPaused());
+        }
+    }
+
+    @Test
+    public void testShowPauseResumeAreIdempotent() {
+        // This test shows that a KafkaStreams instance can be started "paused"
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            streams.pause();
+            Assert.assertTrue(streams.isPaused());
+            streams.pause();
+            Assert.assertTrue(streams.isPaused());
+            streams.resume();
+            Assert.assertFalse(streams.isPaused());
+            streams.resume();
+            Assert.assertFalse(streams.isPaused());
+        }
+    }
+
     @Test
     public void shouldAddThreadWhenRunning() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
@@ -791,6 +829,24 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void shouldThrowOnCleanupWhilePaused() throws InterruptedException {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                "Streams never started.");
+
+            streams.pause();
+            waitForCondition(
+                streams::isPaused,
+                "Streams did not pause.");
+
+            assertThrows("Cannot clean up while running.", IllegalStateException.class,
+                streams::cleanUp);
+        }
+    }
+
     @Test
     public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException {
         final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
new file mode 100644
index 0000000000..e09ba997b2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+    private static Properties producerConfig;
+    private static Properties consumerConfig;
+
+    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE =
+        Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+    private static final String INPUT_STREAM_1 = "input-stream-1";
+    private static final String INPUT_STREAM_2 = "input-stream-2";
+    private static final String OUTPUT_STREAM_1 = "output-stream-1";
+    private static final String OUTPUT_STREAM_2 = "output-stream-2";
+    private static final String TOPOLOGY1 = "topology1";
+    private static final String TOPOLOGY2 = "topology2";
+
+    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
+        asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
+        asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 =
+        asList(pair("A", 3L), pair("B", 2L), pair("A", 4L), pair("C", 3L), pair("C", 4L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA_ALL = new ArrayList<KeyValue<String, Long>>() {{
+            addAll(COUNT_OUTPUT_DATA);
+            addAll(COUNT_OUTPUT_DATA2);
+        }};
+
+    private String appId;
+    private KafkaStreams kafkaStreams, kafkaStreams2;
+    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @BeforeClass
+    public static void startCluster() throws Exception {
+        CLUSTER.start();
+        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+            StringSerializer.class, LongSerializer.class);
+        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
+            StringDeserializer.class, LongDeserializer.class);
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void createTopics() throws InterruptedException {
+        cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2);
+        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+    }
+
+    private Properties props() {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        return properties;
+    }
+
+    @After
+    public void shutdown() throws InterruptedException {
+        for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) {
+            if (streams != null) {
+                streams.close(Duration.ofSeconds(30));
+            }
+        }
+    }
+
+    private static void produceToInputTopics(final String topic, final Collection<KeyValue<String, Long>> records) {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.pause();
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 5);
+
+        kafkaStreams.resume();
+        assertFalse(kafkaStreams.isPaused());
+
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 10);
+    }
+
+    @Test
+    public void shouldAllowForTopologiesToStartPaused() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.pause();
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        kafkaStreams.resume();
+        assertFalse(kafkaStreams.isPaused());
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 5);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 5);
+        assertTopicSize(OUTPUT_STREAM_2, 5);
+
+        streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 5);
+        assertTopicSize(OUTPUT_STREAM_2, 10);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
+    }
+
+    @Test
+    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
+
+        streamsNamedTopologyWrapper.pause();
+        assertTrue(streamsNamedTopologyWrapper.isPaused());
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 5);
+        assertTopicSize(OUTPUT_STREAM_2, 5);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 10);
+        assertTopicSize(OUTPUT_STREAM_2, 5);
+    }
+
+    @Test
+    public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT);
+
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        streamsNamedTopologyWrapper.pause();
+        assertTrue(streamsNamedTopologyWrapper.isPaused());
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+        assertTopicSize(OUTPUT_STREAM_2, 5);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        awaitOutput(OUTPUT_STREAM_1, 10, COUNT_OUTPUT_DATA_ALL);
+        assertTopicSize(OUTPUT_STREAM_1, 10);
+        assertTopicSize(OUTPUT_STREAM_2, 5);
+    }
+
+    @Test
+    public void pauseResumeShouldWorkAcrossInstances() throws Exception {
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.pause();
+        kafkaStreams.start();
+
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+        assertTrue(kafkaStreams.isPaused());
+
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
+        kafkaStreams2.pause();
+        kafkaStreams2.start();
+        waitForApplicationState(singletonList(kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
+        assertTrue(kafkaStreams2.isPaused());
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        waitUntilStreamsHasPolled(kafkaStreams2, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        kafkaStreams2.close();
+        kafkaStreams2.cleanUp();
+        waitForApplicationState(singletonList(kafkaStreams2), State.NOT_RUNNING, STARTUP_TIMEOUT);
+
+        kafkaStreams.resume();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+    }
+
+    @Test
+    public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+        final Properties properties1 = props();
+        properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        final Properties properties2 = props();
+        properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, properties1);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1, properties2);
+        kafkaStreams.start();
+        kafkaStreams2.start();
+
+        waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
+
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.close();
+        kafkaStreams2.close();
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, properties1);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1, properties2);
+        kafkaStreams.cleanUp();
+        kafkaStreams2.cleanUp();
+
+        kafkaStreams.pause();
+        kafkaStreams2.pause();
+        kafkaStreams.start();
+        kafkaStreams2.start();
+
+        waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT);
+
+        assertStreamsLocalStoreLagStaysConstant(kafkaStreams);
+        assertStreamsLocalStoreLagStaysConstant(kafkaStreams2);
+    }
+
+    private void assertStreamsLocalStoreLagStaysConstant(final KafkaStreams streams) throws InterruptedException {
+        waitForCondition(
+            () -> !streams.allLocalStorePartitionLags().isEmpty(),
+            "Lags for local store partitions were not found within the timeout!");
+        waitUntilStreamsHasPolled(streams, 2);
+        final long stateStoreLag1 = streams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
+        waitUntilStreamsHasPolled(streams, 2);
+        final long stateStoreLag2 = streams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
+        assertTrue(stateStoreLag1 > 0);
+        assertEquals(stateStoreLag1, stateStoreLag2);
+    }
+
+    private KafkaStreams buildKafkaStreams(final String outputTopic) {
+        return buildKafkaStreams(outputTopic, props());
+    }
+
+    private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(INPUT_STREAM_1).groupByKey().count(Materialized.as("test-store")).toStream().to(outputTopic);
+        return new KafkaStreams(builder.build(properties), properties);
+    }
+
+    private void assertTopicSize(final String topicName, final int size) {
+        assertEquals(getTopicSize(consumerConfig, topicName), size);
+    }
+
+    private void awaitOutput(final String topicName, final int count, final List<KeyValue<String, Long>> output)
+        throws Exception {
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, topicName, count), CoreMatchers.equalTo(output));
+    }
+
+    private NamedTopologyBuilder getNamedTopologyBuilder1() {
+        final NamedTopologyBuilder builder1 = streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY1);
+        builder1.stream(INPUT_STREAM_1).groupByKey().count().toStream().to(OUTPUT_STREAM_1);
+        return builder1;
+    }
+
+    private NamedTopologyBuilder getNamedTopologyBuilder2() {
+        final NamedTopologyBuilder builder2 = streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY2);
+        builder2.stream(INPUT_STREAM_2)
+            .groupBy((k, v) -> k)
+            .count(IN_MEMORY_STORE)
+            .toStream()
+            .to(OUTPUT_STREAM_2);
+        return builder2;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index a5de09bbf6..31baae96ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1387,6 +1387,41 @@ public class IntegrationTestUtils {
         }
     }
 
+    public static long getTopicSize(final Properties consumerConfig, final String topicName) {
+        long sum = 0;
+        try (final Consumer<Object, Object> consumer = createConsumer(consumerConfig)) {
+            final Collection<TopicPartition> partitions = consumer.partitionsFor(topicName)
+                .stream()
+                .map(info -> new TopicPartition(topicName, info.partition()))
+                .collect(Collectors.toList());
+            final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
+            final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
+
+            for (final TopicPartition partition : beginningOffsets.keySet()) {
+                sum += endOffsets.get(partition) - beginningOffsets.get(partition);
+            }
+        }
+        return sum;
+    }
+
+    private static Double getStreamsPollNumber(final KafkaStreams kafkaStreams) {
+        return (Double) kafkaStreams.metrics()
+            .entrySet()
+            .stream()
+            .filter(entry -> entry.getKey().name().equals("poll-total"))
+            .findFirst().get()
+            .getValue()
+            .metricValue();
+    }
+
+    public static void waitUntilStreamsHasPolled(final KafkaStreams kafkaStreams, final int pollNumber)
+        throws InterruptedException {
+        final Double initialCount = getStreamsPollNumber(kafkaStreams);
+        retryOnExceptionWithTimeout(1000, () -> {
+            assertThat(getStreamsPollNumber(kafkaStreams), is(greaterThanOrEqualTo(initialCount + pollNumber)));
+        });
+    }
+
     public static class StableAssignmentListener implements AssignmentListener {
         final AtomicInteger numStableAssignments = new AtomicInteger(0);
         int nextExpectedNumStableAssignments;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 594fc7e842..a88493ea0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -1092,7 +1092,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         final TaskId taskId = new TaskId(0, 0);
 
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId);
+        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 51151b4f40..74deafd32b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -83,8 +83,6 @@ import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-
-import java.util.function.BiConsumer;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -92,6 +90,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 
 import java.io.File;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -109,6 +108,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
@@ -121,8 +121,8 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
-import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
@@ -149,6 +149,8 @@ public class StreamThreadTest {
     private final static String APPLICATION_ID = "stream-thread-test";
     private final static UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d");
     private final static String CLIENT_ID = APPLICATION_ID + "-" + PROCESS_ID;
+    public static final String STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG = "stream-thread-test-count-one-changelog";
+    public static final String STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG = "stream-thread-test-table-two-changelog";
 
     private final int threadIdx = 1;
     private final Metrics metrics = new Metrics();
@@ -1642,9 +1644,9 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions(
-            "stream-thread-test-count-one-changelog",
+            STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
             Collections.singletonList(
-                new PartitionInfo("stream-thread-test-count-one-changelog",
+                new PartitionInfo(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
                                   0,
                                   null,
                                   new Node[0],
@@ -1653,7 +1655,7 @@ public class StreamThreadTest {
         );
 
         final HashMap<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
+        offsets.put(new TopicPartition(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 1), 0L);
         restoreConsumer.updateEndOffsets(offsets);
         restoreConsumer.updateBeginningOffsets(offsets);
 
@@ -1686,8 +1688,74 @@ public class StreamThreadTest {
         final String storeName2 = "table-two";
         final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog";
         final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog";
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+
+        setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, false);
+
+        thread.runOnce();
+
+        final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1);
+        final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1);
+        assertEquals(task1, standbyTask1.id());
+        assertEquals(task3, standbyTask2.id());
+
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
+
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        addStandbyRecordsToRestoreConsumer(restoreConsumer);
+
+        thread.runOnce();
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(4L, store2.approximateNumEntries());
+
+        thread.taskManager().shutdown(true);
+    }
+
+    private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) {
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
+                2,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+    }
+
+    private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) {
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+    }
+
+    private void setupThread(final String storeName1,
+                             final String storeName2,
+                             final String changelogName1,
+                             final String changelogName2,
+                             final StreamThread thread,
+                             final MockConsumer<byte[], byte[]> restoreConsumer,
+                             final boolean addActiveTask) throws IOException {
+        final TopicPartition activePartition = new TopicPartition(changelogName1, 2);
         final TopicPartition partition1 = new TopicPartition(changelogName1, 1);
         final TopicPartition partition2 = new TopicPartition(changelogName2, 1);
+
         internalStreamsBuilder
             .stream(Collections.singleton(topic1), consumed)
             .groupByKey()
@@ -1697,12 +1765,15 @@ public class StreamThreadTest {
         internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), materialized);
 
         internalStreamsBuilder.buildAndOptimizeTopology();
-        final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
-        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions(changelogName1,
             Collections.singletonList(new PartitionInfo(changelogName1, 1, null, new Node[0], new Node[0]))
         );
 
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(activePartition, 10L));
+        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(activePartition, 0L));
+        ((MockAdminClient) (thread.adminClient())).updateBeginningOffsets(Collections.singletonMap(activePartition, 0L));
+        ((MockAdminClient) (thread.adminClient())).updateEndOffsets(Collections.singletonMap(activePartition, 10L));
+
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
         restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
@@ -1714,47 +1785,75 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
+        if (addActiveTask) {
+            activeTasks.put(task2, Collections.singleton(t1p2));
+        }
+
         // assign single partition
         standbyTasks.put(task1, Collections.singleton(t1p1));
         standbyTasks.put(task3, Collections.singleton(t2p1));
 
-        thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
+        thread.taskManager().handleAssignment(activeTasks, standbyTasks);
         thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), null);
 
         thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception {
+        final String storeName1 = "count-one";
+        final String storeName2 = "table-two";
+        final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog";
+        final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog";
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+
+        setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, true);
 
         thread.runOnce();
 
+        final StreamTask activeTask1 = activeTask(thread.taskManager(), t1p2);
         final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1);
         final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1);
         assertEquals(task1, standbyTask1.id());
         assertEquals(task3, standbyTask2.id());
 
+        final KeyValueStore<Object, Long> activeStore = (KeyValueStore<Object, Long>) activeTask1.getStore(storeName1);
+
         final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
         final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
+
+        assertEquals(0L, activeStore.approximateNumEntries());
         assertEquals(0L, store1.approximateNumEntries());
         assertEquals(0L, store2.approximateNumEntries());
 
+        // Add some records that the active task would handle
+        addActiveRecordsToRestoreConsumer(restoreConsumer);
         // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10
-        for (long i = 0L; i < 10L; i++) {
-            restoreConsumer.addRecord(new ConsumerRecord<>(
-                changelogName1,
-                1,
-                i,
-                ("K" + i).getBytes(),
-                ("V" + i).getBytes()));
-            restoreConsumer.addRecord(new ConsumerRecord<>(
-                changelogName2,
-                1,
-                i,
-                ("K" + i).getBytes(),
-                ("V" + i).getBytes()));
-        }
+        addStandbyRecordsToRestoreConsumer(restoreConsumer);
 
+        // Simulate pause
+        thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
         thread.runOnce();
 
+        assertEquals(0L, activeStore.approximateNumEntries());
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        // Simulate resume
+        thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
+        thread.runOnce();
+
+        assertEquals(10L, activeStore.approximateNumEntries());
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        thread.runOnce();
+        assertEquals(10L, activeStore.approximateNumEntries());
         assertEquals(10L, store1.approximateNumEntries());
         assertEquals(4L, store2.approximateNumEntries());
 
@@ -3108,6 +3207,15 @@ public class StreamThreadTest {
             Optional.empty()));
     }
 
+    StreamTask activeTask(final TaskManager taskManager, final TopicPartition partition) {
+        final Stream<Task> standbys = taskManager.tasks().values().stream().filter(t -> t.isActive());
+        for (final Task task : (Iterable<Task>) standbys::iterator) {
+            if (task.inputPartitions().contains(partition)) {
+                return (StreamTask) task;
+            }
+        }
+        return null;
+    }
     StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) {
         final Stream<Task> standbys = taskManager.tasks().values().stream().filter(t -> !t.isActive());
         for (final Task task : (Iterable<Task>) standbys::iterator) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java
new file mode 100644
index 0000000000..7fcb5e30b4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TaskExecutionMetadataTest {
+    final static String TOPOLOGY1 = "topology1";
+    final static String TOPOLOGY2 = "topology2";
+    final static Set<String> NAMED_TOPOLOGIES = new HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2));
+    final static int TIME_ZERO = 0;
+    final static int CONSTANT_BACKOFF_MS = 5000;
+
+    @Test
+    public void testCanProcessWithoutNamedTopologies() {
+        final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
+        final Set<String> pausedTopologies = new HashSet<>();
+
+        final TaskExecutionMetadata metadata = new TaskExecutionMetadata(topologies, pausedTopologies);
+
+        final Task mockTask = createMockTask(UNNAMED_TOPOLOGY);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask, TIME_ZERO));
+        // This pauses an UNNAMED_TOPOLOGY / a KafkaStreams instance without named/modular
+        // topologies.
+        pausedTopologies.add(UNNAMED_TOPOLOGY);
+        Assert.assertFalse(metadata.canProcessTask(mockTask, TIME_ZERO));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBePausedIndependently() {
+        final Set<String> pausedTopologies = new HashSet<>();
+        final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, pausedTopologies);
+
+        final Task mockTask1 = createMockTask(TOPOLOGY1);
+        final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+        pausedTopologies.add(TOPOLOGY1);
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, TIME_ZERO));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+        pausedTopologies.remove(TOPOLOGY1);
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBeStartedPaused() {
+        final Set<String> pausedTopologies = new HashSet<>();
+        pausedTopologies.add(TOPOLOGY1);
+
+        final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES,
+            pausedTopologies);
+
+        final Task mockTask1 = createMockTask(TOPOLOGY1);
+        final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, TIME_ZERO));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+        pausedTopologies.remove(TOPOLOGY1);
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBackoff() {
+        final Set<String> pausedTopologies = new HashSet<>();
+
+        final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES,
+            pausedTopologies);
+
+        final Task mockTask1 = createMockTask(TOPOLOGY1);
+        final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+        metadata.registerTaskError(mockTask1, new Throwable("Error"), TIME_ZERO);
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, CONSTANT_BACKOFF_MS - 1));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, CONSTANT_BACKOFF_MS - 1));
+
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, CONSTANT_BACKOFF_MS));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, CONSTANT_BACKOFF_MS));
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, CONSTANT_BACKOFF_MS + 1));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, CONSTANT_BACKOFF_MS + 1));
+    }
+
+    private static Task createMockTask(final String topologyName) {
+        final Task mockTask = mock(Task.class);
+        final TaskId taskId = new TaskId(0, 0, topologyName);
+        when(mockTask.id()).thenReturn(taskId);
+        return mockTask;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
new file mode 100644
index 0000000000..a44970238a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class TaskExecutorTest {
+    @Test
+    public void testPunctuateWithPause() {
+        final Tasks tasks = mock(Tasks.class);
+        final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+
+        final TaskExecutor taskExecutor =
+            new TaskExecutor(tasks, metadata, ProcessingMode.AT_LEAST_ONCE, false, new LogContext());
+
+        taskExecutor.punctuate();
+        verify(tasks).notPausedActiveTasks();
+        verify(tasks, never()).notPausedTasks();
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
new file mode 100644
index 0000000000..ad701f8ca4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TasksTest {
+    @Test
+    public void testNotPausedTasks() {
+        final TopologyMetadata topologyMetadata = mock(TopologyMetadata.class);
+        final String unnamedTopologyName = null;
+        when(topologyMetadata.isPaused(unnamedTopologyName))
+            .thenReturn(false)
+            .thenReturn(false).thenReturn(false)
+            .thenReturn(true)
+            .thenReturn(true).thenReturn(true);
+
+        final Tasks tasks = new Tasks(
+            new LogContext(),
+            topologyMetadata,
+            mock(ActiveTaskCreator.class),
+            mock(StandbyTaskCreator.class)
+        );
+
+        final TaskId taskId1 = new TaskId(0, 1);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final StreamTask streamTask = mock(StreamTask.class);
+        when(streamTask.isActive()).thenReturn(true);
+        when(streamTask.id()).thenReturn(taskId1);
+
+        final StandbyTask standbyTask1 = mock(StandbyTask.class);
+        when(standbyTask1.isActive()).thenReturn(false);
+        when(standbyTask1.id()).thenReturn(taskId2);
+
+        tasks.addTask(streamTask);
+        tasks.addTask(standbyTask1);
+        Assert.assertEquals(tasks.notPausedActiveTasks().size(), 1);
+        Assert.assertEquals(tasks.notPausedTasks().size(), 2);
+
+        Assert.assertEquals(tasks.notPausedActiveTasks().size(), 0);
+        Assert.assertEquals(tasks.notPausedTasks().size(), 0);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TopologyMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TopologyMetadataTest.java
new file mode 100644
index 0000000000..52a103a4d4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TopologyMetadataTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.mock;
+
+public class TopologyMetadataTest {
+    final static String TOPOLOGY1 = "topology1";
+    final static String TOPOLOGY2 = "topology2";
+
+    @Test
+    public void testPauseResume() {
+        final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class);
+        final StreamsConfig config = new DummyStreamsConfig();
+
+        final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder,
+            config);
+
+        Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY1));
+        Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY2));
+
+        topologyMetadata.pauseTopology(TOPOLOGY1);
+        Assert.assertTrue(topologyMetadata.isPaused(TOPOLOGY1));
+        Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY2));
+
+        topologyMetadata.resumeTopology(TOPOLOGY1);
+        Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY1));
+        Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY2));
+    }
+}