You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2023/02/21 18:17:18 UTC
[kafka] branch trunk updated: KAFKA-14299: Fix pause and resume with state updater (#13025)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0fc029c6a47 KAFKA-14299: Fix pause and resume with state updater (#13025)
0fc029c6a47 is described below
commit 0fc029c6a47a7a930a2b078569de1173cdb547a4
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Tue Feb 21 19:17:09 2023 +0100
KAFKA-14299: Fix pause and resume with state updater (#13025)
* Fixes required to make the PauseResumeIntegrationTest pass. It was not enabled and it does not pass for the state updater code path.
* Make sure no progress is made on paused topologies. The state updater restored one round of polls from the restore
consumer before realizing that a newly added task was already in paused state when being added.
* Wake up state updater when tasks are being resumed. If a task is resumed, it may be necessary to wake up the state updater from waiting on the tasksAndActions condition.
* Make sure that allTasks methods also return the tasks that are currently being restored.
* Enable PauseResumeIntegrationTest and upgrade it to JUnit5.
Reviewers: Bruno Cadonna <ca...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/KafkaStreams.java | 1 +
.../processor/internals/DefaultStateUpdater.java | 65 ++++++++++----
.../streams/processor/internals/ReadOnlyTask.java | 2 +-
.../streams/processor/internals/StateUpdater.java | 5 ++
.../streams/processor/internals/StreamThread.java | 7 +-
.../streams/processor/internals/TaskManager.java | 35 +++++++-
.../KafkaStreamsNamedTopologyWrapper.java | 2 +
.../integration/PauseResumeIntegrationTest.java | 100 ++++++++++++---------
.../internals/DefaultStateUpdaterTest.java | 1 +
.../processor/internals/ReadOnlyTaskTest.java | 1 +
.../processor/internals/StreamThreadTest.java | 4 +-
.../processor/internals/TaskManagerTest.java | 32 +++++++
12 files changed, 189 insertions(+), 66 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 03b778ab6ec..c05e4c6c1ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1751,6 +1751,7 @@ public class KafkaStreams implements AutoCloseable {
} else {
topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
}
+ threads.forEach(StreamThread::signalResume);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index d96593c5011..ae6618c304f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -116,6 +116,7 @@ public class DefaultStateUpdater implements StateUpdater {
private void runOnce() throws InterruptedException {
performActionsOnTasks();
+ resumeTasks();
restoreTasks();
checkAllUpdatingTaskStates(time.milliseconds());
waitIfAllChangelogsCompletelyRead();
@@ -140,6 +141,16 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
+ private void resumeTasks() {
+ if (isTopologyResumed.compareAndSet(true, false)) {
+ for (final Task task : pausedTasks.values()) {
+ if (!topologyMetadata.isPaused(task.id().topologyName())) {
+ resumeTask(task);
+ }
+ }
+ }
+ }
+
private void restoreTasks() {
try {
changelogReader.restore(updatingTasks);
@@ -229,7 +240,7 @@ public class DefaultStateUpdater implements StateUpdater {
if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
tasksAndActionsLock.lock();
try {
- while (tasksAndActions.isEmpty()) {
+ while (tasksAndActions.isEmpty() && !isTopologyResumed.get()) {
tasksAndActionsCondition.await();
}
} finally {
@@ -258,21 +269,39 @@ public class DefaultStateUpdater implements StateUpdater {
}
private void addTask(final Task task) {
+ final TaskId taskId = task.id();
+
+ Task existingTask = pausedTasks.get(taskId);
+ if (existingTask != null) {
+ throw new IllegalStateException(
+ (existingTask.isActive() ? "Active" : "Standby") + " task " + taskId + " already exist in paused tasks, " +
+ "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. "
+ + BUG_ERROR_MESSAGE);
+ }
+ existingTask = updatingTasks.get(taskId);
+ if (existingTask != null) {
+ throw new IllegalStateException(
+ (existingTask.isActive() ? "Active" : "Standby") + " task " + taskId + " already exist in updating tasks, " +
+ "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. "
+ + BUG_ERROR_MESSAGE);
+ }
+
if (isStateless(task)) {
addToRestoredTasks((StreamTask) task);
- log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
+ log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater");
+ } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+ pausedTasks.put(taskId, task);
+ changelogReader.register(task.changelogPartitions(), task.stateManager());
+ log.debug((task.isActive() ? "Active" : "Standby")
+ + " task " + taskId + " was directly added to the paused tasks.");
} else {
- final Task existingTask = updatingTasks.putIfAbsent(task.id(), task);
- if (existingTask != null) {
- throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " +
- "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
- }
+ updatingTasks.put(taskId, task);
changelogReader.register(task.changelogPartitions(), task.stateManager());
if (task.isActive()) {
- log.info("Stateful active task " + task.id() + " was added to the state updater");
+ log.info("Stateful active task " + taskId + " was added to the state updater");
changelogReader.enforceRestoreActive();
} else {
- log.info("Standby task " + task.id() + " was added to the state updater");
+ log.info("Standby task " + taskId + " was added to the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
@@ -388,12 +417,6 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
- for (final Task task : pausedTasks.values()) {
- if (!topologyMetadata.isPaused(task.id().topologyName())) {
- resumeTask(task);
- }
- }
-
lastCommitMs = now;
}
}
@@ -411,6 +434,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>();
private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
+ private final AtomicBoolean isTopologyResumed = new AtomicBoolean(false);
private final long commitIntervalMs;
private long lastCommitMs;
@@ -506,6 +530,17 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
+ @Override
+ public void signalResume() {
+ tasksAndActionsLock.lock();
+ try {
+ isTopologyResumed.set(true);
+ tasksAndActionsCondition.signalAll();
+ } finally {
+ tasksAndActionsLock.unlock();
+ }
+ }
+
@Override
public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
final long timeoutMs = timeout.toMillis();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index e26b6ca29fd..ee3989cf62e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -200,7 +200,7 @@ public class ReadOnlyTask implements Task {
@Override
public Map<TopicPartition, Long> changelogOffsets() {
- throw new UnsupportedOperationException("This task is read-only");
+ return task.changelogOffsets();
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 10ac51874d2..3153472bf92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -94,6 +94,11 @@ public interface StateUpdater {
*/
void remove(final TaskId taskId);
+ /**
+ * Wakes up the state updater if it is currently dormant, to check if a paused task should be resumed.
+ */
+ void signalResume();
+
/**
* Drains the restored active tasks from the state updater.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3fb1e2f3cb9..02bd74a027d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1096,6 +1096,11 @@ public class StreamThread extends Thread {
return isAlive();
}
+ // Call method when a topology is resumed
+ public void signalResume() {
+ taskManager.signalResume();
+ }
+
/**
* Try to commit all active tasks owned by this thread.
*
@@ -1113,7 +1118,7 @@ public class StreamThread extends Thread {
}
committed = taskManager.commit(
- taskManager.allTasks()
+ taskManager.allOwnedTasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 2a5b3d2bacf..cf83cb27eba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1115,6 +1115,12 @@ public class TaskManager {
}
}
+ public void signalResume() {
+ if (stateUpdater != null) {
+ stateUpdater.signalResume();
+ }
+ }
+
/**
* Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the
* lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}.
@@ -1532,15 +1538,38 @@ public class TaskManager {
}
Map<TaskId, Task> allTasks() {
+ // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
+ // if any outside code modifies the map or the tasks, it would be a severe transgression.
+ if (stateUpdater != null) {
+ final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+ ret.putAll(tasks.allTasksPerId());
+ return ret;
+ } else {
+ return tasks.allTasksPerId();
+ }
+ }
+
+ /**
+ * Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With
+ * state updater enabled, this does not return any tasks currently owned by the state updater.
+ * @return
+ */
+ Map<TaskId, Task> allOwnedTasks() {
// not bothering with an unmodifiable map, since the tasks themselves are mutable, but
// if any outside code modifies the map or the tasks, it would be a severe transgression.
return tasks.allTasksPerId();
}
Set<Task> readOnlyAllTasks() {
- // need to make sure the returned set is unmodifiable as it could be accessed
- // by other thread than the StreamThread owning this task manager;
- return Collections.unmodifiableSet(tasks.allTasks());
+ // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
+ // if any outside code modifies the map or the tasks, it would be a severe transgression.
+ if (stateUpdater != null) {
+ final HashSet<Task> ret = new HashSet<>(stateUpdater.getTasks());
+ ret.addAll(tasks.allTasks());
+ return Collections.unmodifiableSet(ret);
+ } else {
+ return Collections.unmodifiableSet(tasks.allTasks());
+ }
}
Map<TaskId, Task> notPausedTasks() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 4704d1d4df7..dd70ad0abbf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.slf4j.Logger;
@@ -276,6 +277,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
*/
public void resumeNamedTopology(final String topologyName) {
topologyMetadata.resumeTopology(topologyName);
+ threads.forEach(StreamThread::signalResume);
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index e09ba997b2e..5e1331bf675 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -35,17 +35,16 @@ import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNa
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.util.ArrayList;
@@ -53,6 +52,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -69,7 +69,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@Category({IntegrationTest.class})
+@Tag("integration")
public class PauseResumeIntegrationTest {
private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@@ -101,10 +101,14 @@ public class PauseResumeIntegrationTest {
private KafkaStreams kafkaStreams, kafkaStreams2;
private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
- @Rule
- public final TestName testName = new TestName();
- @BeforeClass
+ private static Stream<Boolean> parameters() {
+ return Stream.of(
+ Boolean.TRUE,
+ Boolean.FALSE);
+ }
+
+ @BeforeAll
public static void startCluster() throws Exception {
CLUSTER.start();
producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
@@ -113,18 +117,18 @@ public class PauseResumeIntegrationTest {
StringDeserializer.class, LongDeserializer.class);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Before
- public void createTopics() throws InterruptedException {
+ @BeforeEach
+ public void createTopics(final TestInfo testInfo) throws InterruptedException {
cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2);
- appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+ appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testInfo);
}
- private Properties props() {
+ private Properties props(final boolean stateUpdaterEnabled) {
final Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -136,10 +140,11 @@ public class PauseResumeIntegrationTest {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+ properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
return properties;
}
- @After
+ @AfterEach
public void shutdown() throws InterruptedException {
for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) {
if (streams != null) {
@@ -152,9 +157,10 @@ public class PauseResumeIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
}
- @Test
- public void shouldPauseAndResumeKafkaStreams() throws Exception {
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled) throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
@@ -176,9 +182,10 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_1, 10);
}
- @Test
- public void shouldAllowForTopologiesToStartPaused() throws Exception {
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
@@ -195,9 +202,10 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_1, 5);
}
- @Test
- public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
- streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -229,9 +237,10 @@ public class PauseResumeIntegrationTest {
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
}
- @Test
- public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
- streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -264,9 +273,10 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_2, 5);
}
- @Test
- public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
- streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -306,18 +316,19 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_2, 5);
}
- @Test
- public void pauseResumeShouldWorkAcrossInstances() throws Exception {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabled) throws Exception {
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
assertTrue(kafkaStreams.isPaused());
- kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
+ kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled);
kafkaStreams2.pause();
kafkaStreams2.start();
waitForApplicationState(singletonList(kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
@@ -337,11 +348,12 @@ public class PauseResumeIntegrationTest {
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
}
- @Test
- public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
- final Properties properties1 = props();
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void pausedTopologyShouldNotRestoreStateStores(final boolean stateUpdaterEnabled) throws Exception {
+ final Properties properties1 = props(stateUpdaterEnabled);
properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
- final Properties properties2 = props();
+ final Properties properties2 = props(stateUpdaterEnabled);
properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
@@ -385,8 +397,8 @@ public class PauseResumeIntegrationTest {
assertEquals(stateStoreLag1, stateStoreLag2);
}
- private KafkaStreams buildKafkaStreams(final String outputTopic) {
- return buildKafkaStreams(outputTopic, props());
+ private KafkaStreams buildKafkaStreams(final String outputTopic, final boolean stateUpdaterEnabled) {
+ return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
}
private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 3e7449f9411..b0c0ba7c156 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1047,6 +1047,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks();
when(topologyMetadata.isPaused(null)).thenReturn(false);
+ stateUpdater.signalResume();
verifyPausedTasks();
verifyUpdatingTasks(task);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
index cd5da873981..9b780dee6bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -41,6 +41,7 @@ class ReadOnlyTaskTest {
add("changelogPartitions");
add("commitRequested");
add("isActive");
+ add("changelogOffsets");
add("state");
add("id");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index dc9259a1f91..a8768ddd3eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2734,7 +2734,7 @@ public class StreamThreadTest {
expect(task3.state()).andReturn(Task.State.CREATED).anyTimes();
expect(task3.id()).andReturn(taskId3).anyTimes();
- expect(taskManager.allTasks()).andReturn(mkMap(
+ expect(taskManager.allOwnedTasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2),
mkEntry(taskId3, task3)
@@ -3084,7 +3084,7 @@ public class StreamThreadTest {
expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
expect(runningTask.id()).andStubReturn(taskId);
- expect(taskManager.allTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask));
+ expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask));
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
return taskManager;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 00d44b045b1..f43de372388 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -497,6 +497,38 @@ public class TaskManagerTest {
Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
}
+ @Test
+ public void shouldReturnStateUpdaterTasksInAllTasks() {
+ final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId03Partitions).build();
+ final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions).build();
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
+ when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
+ assertEquals(taskManager.allTasks(), mkMap(mkEntry(taskId03, activeTask), mkEntry(taskId02, standbyTask)));
+ }
+
+ @Test
+ public void shouldNotReturnStateUpdaterTasksInOwnedTasks() {
+ final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId03Partitions).build();
+ final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions).build();
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
+ when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
+ assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
+ }
+
@Test
public void shouldCreateActiveTaskDuringAssignment() {
final StreamTask activeTaskToBeCreated = statefulTask(taskId03, taskId03ChangelogPartitions)