You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2023/02/21 18:17:18 UTC

[kafka] branch trunk updated: KAFKA-14299: Fix pause and resume with state updater (#13025)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0fc029c6a47 KAFKA-14299: Fix pause and resume with state updater (#13025)
0fc029c6a47 is described below

commit 0fc029c6a47a7a930a2b078569de1173cdb547a4
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Tue Feb 21 19:17:09 2023 +0100

    KAFKA-14299: Fix pause and resume with state updater (#13025)
    
    * Fixes required to make the PauseResumeIntegrationTest pass. It was not enabled and it does not pass for the state updater code path.
    
    * Make sure no progress is made on paused topologies. The state updater restored one round of polls from the restore
    consumer before realizing that a newly added task was already in paused state when being added.
    
    * Wake up state updater when tasks are being resumed. If a task is resumed, it may be necessary to wake up the state updater from waiting on the tasksAndActions condition.
    
    * Make sure that allTasks methods also return the tasks that are currently being restored.
    
    * Enable PauseResumeIntegrationTest and upgrade it to JUnit5.
    
    Reviewers: Bruno Cadonna <ca...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   1 +
 .../processor/internals/DefaultStateUpdater.java   |  65 ++++++++++----
 .../streams/processor/internals/ReadOnlyTask.java  |   2 +-
 .../streams/processor/internals/StateUpdater.java  |   5 ++
 .../streams/processor/internals/StreamThread.java  |   7 +-
 .../streams/processor/internals/TaskManager.java   |  35 +++++++-
 .../KafkaStreamsNamedTopologyWrapper.java          |   2 +
 .../integration/PauseResumeIntegrationTest.java    | 100 ++++++++++++---------
 .../internals/DefaultStateUpdaterTest.java         |   1 +
 .../processor/internals/ReadOnlyTaskTest.java      |   1 +
 .../processor/internals/StreamThreadTest.java      |   4 +-
 .../processor/internals/TaskManagerTest.java       |  32 +++++++
 12 files changed, 189 insertions(+), 66 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 03b778ab6ec..c05e4c6c1ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1751,6 +1751,7 @@ public class KafkaStreams implements AutoCloseable {
         } else {
             topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
         }
+        threads.forEach(StreamThread::signalResume);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index d96593c5011..ae6618c304f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -116,6 +116,7 @@ public class DefaultStateUpdater implements StateUpdater {
 
         private void runOnce() throws InterruptedException {
             performActionsOnTasks();
+            resumeTasks();
             restoreTasks();
             checkAllUpdatingTaskStates(time.milliseconds());
             waitIfAllChangelogsCompletelyRead();
@@ -140,6 +141,16 @@ public class DefaultStateUpdater implements StateUpdater {
             }
         }
 
+        private void resumeTasks() {
+            if (isTopologyResumed.compareAndSet(true, false)) {
+                for (final Task task : pausedTasks.values()) {
+                    if (!topologyMetadata.isPaused(task.id().topologyName())) {
+                        resumeTask(task);
+                    }
+                }
+            }
+        }
+
         private void restoreTasks() {
             try {
                 changelogReader.restore(updatingTasks);
@@ -229,7 +240,7 @@ public class DefaultStateUpdater implements StateUpdater {
             if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
                 tasksAndActionsLock.lock();
                 try {
-                    while (tasksAndActions.isEmpty()) {
+                    while (tasksAndActions.isEmpty() && !isTopologyResumed.get()) {
                         tasksAndActionsCondition.await();
                     }
                 } finally {
@@ -258,21 +269,39 @@ public class DefaultStateUpdater implements StateUpdater {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
+
+            Task existingTask = pausedTasks.get(taskId);
+            if (existingTask != null) {
+                throw new IllegalStateException(
+                    (existingTask.isActive() ? "Active" : "Standby") + " task " + taskId + " already exist in paused tasks, " +
+                        "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. "
+                        + BUG_ERROR_MESSAGE);
+            }
+            existingTask = updatingTasks.get(taskId);
+            if (existingTask != null) {
+                throw new IllegalStateException(
+                    (existingTask.isActive() ? "Active" : "Standby") + " task " + taskId + " already exist in updating tasks, " +
+                        "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. "
+                        + BUG_ERROR_MESSAGE);
+            }
+
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);
+                changelogReader.register(task.changelogPartitions(), task.stateManager());
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + taskId + " was directly added to the paused tasks.");
             } else {
-                final Task existingTask = updatingTasks.putIfAbsent(task.id(), task);
-                if (existingTask != null) {
-                    throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " +
-                        "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
-                }
+                updatingTasks.put(taskId, task);
                 changelogReader.register(task.changelogPartitions(), task.stateManager());
                 if (task.isActive()) {
-                    log.info("Stateful active task " + task.id() + " was added to the state updater");
+                    log.info("Stateful active task " + taskId + " was added to the state updater");
                     changelogReader.enforceRestoreActive();
                 } else {
-                    log.info("Standby task " + task.id() + " was added to the state updater");
+                    log.info("Standby task " + taskId + " was added to the state updater");
                     if (updatingTasks.size() == 1) {
                         changelogReader.transitToUpdateStandby();
                     }
@@ -388,12 +417,6 @@ public class DefaultStateUpdater implements StateUpdater {
                     }
                 }
 
-                for (final Task task : pausedTasks.values()) {
-                    if (!topologyMetadata.isPaused(task.id().topologyName())) {
-                        resumeTask(task);
-                    }
-                }
-
                 lastCommitMs = now;
             }
         }
@@ -411,6 +434,7 @@ public class DefaultStateUpdater implements StateUpdater {
     private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
     private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>();
     private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
+    private final AtomicBoolean isTopologyResumed = new AtomicBoolean(false);
 
     private final long commitIntervalMs;
     private long lastCommitMs;
@@ -506,6 +530,17 @@ public class DefaultStateUpdater implements StateUpdater {
         }
     }
 
+    @Override
+    public void signalResume() {
+        tasksAndActionsLock.lock();
+        try {
+            isTopologyResumed.set(true);
+            tasksAndActionsCondition.signalAll();
+        } finally {
+            tasksAndActionsLock.unlock();
+        }
+    }
+
     @Override
     public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
         final long timeoutMs = timeout.toMillis();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index e26b6ca29fd..ee3989cf62e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -200,7 +200,7 @@ public class ReadOnlyTask implements Task {
 
     @Override
     public Map<TopicPartition, Long> changelogOffsets() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.changelogOffsets();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 10ac51874d2..3153472bf92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -94,6 +94,11 @@ public interface StateUpdater {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Wakes up the state updater if it is currently dormant, to check if a paused task should be resumed.
+     */
+    void signalResume();
+
     /**
      * Drains the restored active tasks from the state updater.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3fb1e2f3cb9..02bd74a027d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1096,6 +1096,11 @@ public class StreamThread extends Thread {
         return isAlive();
     }
 
+    // Call method when a topology is resumed
+    public void signalResume() {
+        taskManager.signalResume();
+    }
+
     /**
      * Try to commit all active tasks owned by this thread.
      *
@@ -1113,7 +1118,7 @@ public class StreamThread extends Thread {
             }
 
             committed = taskManager.commit(
-                taskManager.allTasks()
+                taskManager.allOwnedTasks()
                     .values()
                     .stream()
                     .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 2a5b3d2bacf..cf83cb27eba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1115,6 +1115,12 @@ public class TaskManager {
         }
     }
 
+    public void signalResume() {
+        if (stateUpdater != null) {
+            stateUpdater.signalResume();
+        }
+    }
+
     /**
      * Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the
      * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}.
@@ -1532,15 +1538,38 @@ public class TaskManager {
     }
 
     Map<TaskId, Task> allTasks() {
+        // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
+        // if any outside code modifies the map or the tasks, it would be a severe transgression.
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());
+            return ret;
+        } else {
+            return tasks.allTasksPerId();
+        }
+    }
+
+    /**
+     * Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With
+     * state updater enabled, this does not return any tasks currently owned by the state updater.
+     * @return
+     */
+    Map<TaskId, Task> allOwnedTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
         // if any outside code modifies the map or the tasks, it would be a severe transgression.
         return tasks.allTasksPerId();
     }
 
     Set<Task> readOnlyAllTasks() {
-        // need to make sure the returned set is unmodifiable as it could be accessed
-        // by other thread than the StreamThread owning this task manager;
-        return Collections.unmodifiableSet(tasks.allTasks());
+        // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
+        // if any outside code modifies the map or the tasks, it would be a severe transgression.
+        if (stateUpdater != null) {
+            final HashSet<Task> ret = new HashSet<>(stateUpdater.getTasks());
+            ret.addAll(tasks.allTasks());
+            return Collections.unmodifiableSet(ret);
+        } else {
+            return Collections.unmodifiableSet(tasks.allTasks());
+        }
     }
 
     Map<TaskId, Task> notPausedTasks() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 4704d1d4df7..dd70ad0abbf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.slf4j.Logger;
@@ -276,6 +277,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
      */
     public void resumeNamedTopology(final String topologyName) {
         topologyMetadata.resumeTopology(topologyName);
+        threads.forEach(StreamThread::signalResume);
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index e09ba997b2e..5e1331bf675 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -35,17 +35,16 @@ import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNa
 import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.hamcrest.CoreMatchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -53,6 +52,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -69,7 +69,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@Category({IntegrationTest.class})
+@Tag("integration")
 public class PauseResumeIntegrationTest {
     private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@@ -101,10 +101,14 @@ public class PauseResumeIntegrationTest {
     private KafkaStreams kafkaStreams, kafkaStreams2;
     private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
 
-    @Rule
-    public final TestName testName = new TestName();
 
-    @BeforeClass
+    private static Stream<Boolean> parameters() {
+        return Stream.of(
+            Boolean.TRUE,
+            Boolean.FALSE);
+    }
+
+    @BeforeAll
     public static void startCluster() throws Exception {
         CLUSTER.start();
         producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
@@ -113,18 +117,18 @@ public class PauseResumeIntegrationTest {
             StringDeserializer.class, LongDeserializer.class);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Before
-    public void createTopics() throws InterruptedException {
+    @BeforeEach
+    public void createTopics(final TestInfo testInfo) throws InterruptedException {
         cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2);
-        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testInfo);
     }
 
-    private Properties props() {
+    private Properties props(final boolean stateUpdaterEnabled) {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -136,10 +140,11 @@ public class PauseResumeIntegrationTest {
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
         return properties;
     }
 
-    @After
+    @AfterEach
     public void shutdown() throws InterruptedException {
         for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) {
             if (streams != null) {
@@ -152,9 +157,10 @@ public class PauseResumeIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
     }
 
-    @Test
-    public void shouldPauseAndResumeKafkaStreams() throws Exception {
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled) throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.start();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
 
@@ -176,9 +182,10 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_1, 10);
     }
 
-    @Test
-    public void shouldAllowForTopologiesToStartPaused() throws Exception {
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.pause();
         kafkaStreams.start();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
@@ -195,9 +202,10 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_1, 5);
     }
 
-    @Test
-    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
-        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -229,9 +237,10 @@ public class PauseResumeIntegrationTest {
         awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
     }
 
-    @Test
-    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
-        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -264,9 +273,10 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_2, 5);
     }
 
-    @Test
-    public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
-        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -306,18 +316,19 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_2, 5);
     }
 
-    @Test
-    public void pauseResumeShouldWorkAcrossInstances() throws Exception {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabled) throws Exception {
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.pause();
         kafkaStreams.start();
 
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
         assertTrue(kafkaStreams.isPaused());
 
-        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled);
         kafkaStreams2.pause();
         kafkaStreams2.start();
         waitForApplicationState(singletonList(kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
@@ -337,11 +348,12 @@ public class PauseResumeIntegrationTest {
         awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
     }
 
-    @Test
-    public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
-        final Properties properties1 = props();
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void pausedTopologyShouldNotRestoreStateStores(final boolean stateUpdaterEnabled) throws Exception {
+        final Properties properties1 = props(stateUpdaterEnabled);
         properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
-        final Properties properties2 = props();
+        final Properties properties2 = props(stateUpdaterEnabled);
         properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
@@ -385,8 +397,8 @@ public class PauseResumeIntegrationTest {
         assertEquals(stateStoreLag1, stateStoreLag2);
     }
 
-    private KafkaStreams buildKafkaStreams(final String outputTopic) {
-        return buildKafkaStreams(outputTopic, props());
+    private KafkaStreams buildKafkaStreams(final String outputTopic, final boolean stateUpdaterEnabled) {
+        return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
     }
 
     private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 3e7449f9411..b0c0ba7c156 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1047,6 +1047,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingTasks();
 
         when(topologyMetadata.isPaused(null)).thenReturn(false);
+        stateUpdater.signalResume();
 
         verifyPausedTasks();
         verifyUpdatingTasks(task);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
index cd5da873981..9b780dee6bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -41,6 +41,7 @@ class ReadOnlyTaskTest {
             add("changelogPartitions");
             add("commitRequested");
             add("isActive");
+            add("changelogOffsets");
             add("state");
             add("id");
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index dc9259a1f91..a8768ddd3eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2734,7 +2734,7 @@ public class StreamThreadTest {
         expect(task3.state()).andReturn(Task.State.CREATED).anyTimes();
         expect(task3.id()).andReturn(taskId3).anyTimes();
 
-        expect(taskManager.allTasks()).andReturn(mkMap(
+        expect(taskManager.allOwnedTasks()).andReturn(mkMap(
             mkEntry(taskId1, task1),
             mkEntry(taskId2, task2),
             mkEntry(taskId3, task3)
@@ -3084,7 +3084,7 @@ public class StreamThreadTest {
 
         expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
         expect(runningTask.id()).andStubReturn(taskId);
-        expect(taskManager.allTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask));
+        expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask));
         expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
         return taskManager;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 00d44b045b1..f43de372388 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -497,6 +497,38 @@ public class TaskManagerTest {
         Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
     }
 
+    @Test
+    public void shouldReturnStateUpdaterTasksInAllTasks() {
+        final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId03Partitions).build();
+        final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions).build();
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
+        assertEquals(taskManager.allTasks(), mkMap(mkEntry(taskId03, activeTask), mkEntry(taskId02, standbyTask)));
+    }
+
+    @Test
+    public void shouldNotReturnStateUpdaterTasksInOwnedTasks() {
+        final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId03Partitions).build();
+        final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions).build();
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
+        assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
+    }
+
     @Test
     public void shouldCreateActiveTaskDuringAssignment() {
         final StreamTask activeTaskToBeCreated = statefulTask(taskId03, taskId03ChangelogPartitions)