You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/06/16 14:06:17 UTC
[kafka] branch trunk updated: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ed3748a46 KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)
7ed3748a46 is described below
commit 7ed3748a462cd1ce7c30bb9a331b94a3cd79a401
Author: James Hughes <jn...@gmail.com>
AuthorDate: Thu Jun 16 10:06:02 2022 -0400
KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)
This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies (KIP-834).
Co-authored-by: Bruno Cadonna <ca...@apache.org>
Reviewers: Bonnie Varghese <bv...@confluent.io>, Walker Carlson <wc...@confluent.io>, Guozhang Wang <gu...@apache.org>, Bruno Cadonna <ca...@apache.org>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 48 ++-
.../processor/internals/ActiveTaskCreator.java | 2 +-
.../processor/internals/StoreChangelogReader.java | 55 ++-
.../streams/processor/internals/StreamThread.java | 3 +-
.../processor/internals/TaskExecutionMetadata.java | 14 +-
.../streams/processor/internals/TaskExecutor.java | 2 +-
.../streams/processor/internals/TaskManager.java | 7 +-
.../kafka/streams/processor/internals/Tasks.java | 24 +-
.../processor/internals/TopologyMetadata.java | 36 +-
.../KafkaStreamsNamedTopologyWrapper.java | 25 ++
.../org/apache/kafka/streams/KafkaStreamsTest.java | 64 +++-
.../integration/PauseResumeIntegrationTest.java | 422 +++++++++++++++++++++
.../integration/utils/IntegrationTestUtils.java | 35 ++
.../internals/StoreChangelogReaderTest.java | 2 +-
.../processor/internals/StreamThreadTest.java | 154 ++++++--
.../internals/TaskExecutionMetadataTest.java | 124 ++++++
.../processor/internals/TaskExecutorTest.java | 41 ++
.../streams/processor/internals/TasksTest.java | 65 ++++
.../processor/internals/TopologyMetadataTest.java | 49 +++
20 files changed, 1125 insertions(+), 49 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f2c24541a1..033017f0fe 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -225,7 +225,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
<suppress checks="JavaNCSS"
- files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
+ files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ed62f89ebc..86ab83f67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -56,7 +56,6 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -65,6 +64,7 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
@@ -111,6 +111,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -1735,6 +1736,51 @@ public class KafkaStreams implements AutoCloseable {
return queryableStoreProvider.getStore(storeQueryParameters);
}
+ /**
+ * This method pauses processing for the KafkaStreams instance.
+ *
+ * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks.
+ * Notably, paused topologies will still poll Kafka consumers, and commit offsets.
+ * This method sets transient state that is not maintained or managed among instances.
+ * Note that pause() can be called before start() in order to start a KafkaStreams instance
+ * in a manner where the processing is paused as described, but the consumers are started up.
+ */
+ public void pause() {
+ if (topologyMetadata.hasNamedTopologies()) {
+ for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
+ topologyMetadata.pauseTopology(namedTopology.name());
+ }
+ } else {
+ topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+ }
+ }
+
+ /**
+ * @return true when the KafkaStreams instance has its processing paused.
+ */
+ public boolean isPaused() {
+ if (topologyMetadata.hasNamedTopologies()) {
+ return topologyMetadata.getAllNamedTopologies().stream()
+ .map(NamedTopology::name)
+ .allMatch(topologyMetadata::isPaused);
+ } else {
+ return topologyMetadata.isPaused(UNNAMED_TOPOLOGY);
+ }
+ }
+
+ /**
+ * This method resumes processing for the KafkaStreams instance.
+ */
+ public void resume() {
+ if (topologyMetadata.hasNamedTopologies()) {
+ for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
+ topologyMetadata.resumeTopology(namedTopology.name());
+ }
+ } else {
+ topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
+ }
+ }
+
/**
* handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index d90266dbf4..e7832df6b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -151,7 +151,7 @@ class ActiveTaskCreator {
}
// TODO: change return type to `StreamTask`
- Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
+ public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
// TODO: change type to `StreamTask`
final List<Task> createdTasks = new ArrayList<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 756bf11b0a..5c4f0b12e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.slf4j.Logger;
import java.time.Duration;
@@ -429,6 +430,8 @@ public class StoreChangelogReader implements ChangelogReader {
final ConsumerRecords<byte[], byte[]> polledRecords;
try {
+ pauseResumePartitions(tasks, restoringChangelogs);
+
// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
@@ -463,7 +466,10 @@ public class StoreChangelogReader implements ChangelogReader {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
try {
if (restoreChangelog(changelogs.get(partition))) {
- tasks.get(taskId).clearTaskTimeout();
+ final Task task = tasks.get(taskId);
+ if (task != null) {
+ task.clearTaskTimeout();
+ }
}
} catch (final TimeoutException timeoutException) {
tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
@@ -479,6 +485,47 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
+ private void pauseResumePartitions(final Map<TaskId, Task> tasks,
+ final Set<TopicPartition> restoringChangelogs) {
+ if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+ updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE);
+ }
+ if (state == ChangelogReaderState.STANDBY_UPDATING) {
+ updatePartitionsByType(tasks, restoringChangelogs, TaskType.STANDBY);
+ }
+ }
+
+ private void updatePartitionsByType(final Map<TaskId, Task> tasks,
+ final Set<TopicPartition> restoringChangelogs,
+ final TaskType taskType) {
+ final Collection<TopicPartition> toResume =
+ restoringChangelogs.stream().filter(t -> shouldResume(tasks, t, taskType)).collect(Collectors.toList());
+ final Collection<TopicPartition> toPause =
+ restoringChangelogs.stream().filter(t -> shouldPause(tasks, t, taskType)).collect(Collectors.toList());
+ restoreConsumer.resume(toResume);
+ restoreConsumer.pause(toPause);
+ }
+
+ private boolean shouldResume(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
+ final ProcessorStateManager manager = changelogs.get(partition).stateManager;
+ final TaskId taskId = manager.taskId();
+ final Task task = tasks.get(taskId);
+ if (manager.taskType() == taskType) {
+ return task != null;
+ }
+ return false;
+ }
+
+ private boolean shouldPause(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
+ final ProcessorStateManager manager = changelogs.get(partition).stateManager;
+ final TaskId taskId = manager.taskId();
+ final Task task = tasks.get(taskId);
+ if (manager.taskType() == taskType) {
+ return task == null;
+ }
+ return false;
+ }
+
private void maybeLogRestorationProgress() {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
@@ -633,7 +680,11 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void clearTaskTimeout(final Set<Task> tasks) {
- tasks.forEach(Task::clearTaskTimeout);
+ tasks.forEach(t -> {
+ if (t != null) {
+ t.clearTaskTimeout();
+ }
+ });
}
private void maybeInitTaskTimeoutOrThrow(final Set<Task> tasks,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 15e4903e63..7d3be3b1a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -897,7 +897,8 @@ public class StreamThread extends Thread {
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
- changelogReader.restore(taskManager.tasks());
+ // After KAFKA-13873, we only restore the not paused tasks.
+ changelogReader.restore(taskManager.notPausedTasks());
log.debug("Idempotent restore call done. Thread state has not changed.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
index f3422537f9..310cdef66e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -35,21 +35,27 @@ public class TaskExecutionMetadata {
private static final long CONSTANT_BACKOFF_MS = 5_000L;
private final boolean hasNamedTopologies;
+ private final Set<String> pausedTopologies;
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
- public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+ public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+ this.pausedTopologies = pausedTopologies;
}
public boolean canProcessTask(final Task task, final long now) {
final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies (needs KIP)
- return true;
+ return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
} else {
- final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
- return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+ if (pausedTopologies.contains(topologyName)) {
+ return false;
+ } else {
+ final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+ return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index cad03fbd1b..bab8a75149 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -275,7 +275,7 @@ public class TaskExecutor {
int punctuate() {
int punctuated = 0;
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.notPausedActiveTasks()) {
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4bc8e4335c..2d13085962 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -115,7 +115,7 @@ public class TaskManager {
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
- this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator);
+ this.tasks = new Tasks(logContext, topologyMetadata, activeTaskCreator, standbyTaskCreator);
this.taskExecutor = new TaskExecutor(
tasks,
topologyMetadata.taskExecutionMetadata(),
@@ -1018,6 +1018,11 @@ public class TaskManager {
return tasks.tasksPerId();
}
+ Map<TaskId, Task> notPausedTasks() {
+ return Collections.unmodifiableMap(tasks.notPausedTasks().stream()
+ .collect(Collectors.toMap(Task::id, v -> v)));
+ }
+
Map<TaskId, Task> activeTaskMap() {
return activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index c4aec35d4e..ca5481b67b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -22,14 +22,13 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-
-import java.util.HashSet;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -38,9 +37,8 @@ import java.util.stream.Collectors;
class Tasks {
private final Logger log;
private final TopologyMetadata topologyMetadata;
- private final StreamsMetricsImpl streamsMetrics;
- private final Map<TaskId, Task> allTasksPerId = new TreeMap<>();
+ private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());
@@ -68,14 +66,12 @@ class Tasks {
Tasks(final LogContext logContext,
final TopologyMetadata topologyMetadata,
- final StreamsMetricsImpl streamsMetrics,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator) {
log = logContext.logger(getClass());
this.topologyMetadata = topologyMetadata;
- this.streamsMetrics = streamsMetrics;
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
}
@@ -273,6 +269,20 @@ class Tasks {
return readOnlyTasks;
}
+ Collection<Task> notPausedActiveTasks() {
+ return new ArrayList<>(readOnlyActiveTasks)
+ .stream()
+ .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
+ .collect(Collectors.toList());
+ }
+
+ Collection<Task> notPausedTasks() {
+ return new ArrayList<>(readOnlyTasks)
+ .stream()
+ .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
+ .collect(Collectors.toList());
+ }
+
Set<TaskId> activeTaskIds() {
return readOnlyActiveTaskIds;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 3c0710a847..e3c057b9a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -73,6 +73,7 @@ public class TopologyMetadata {
private final ProcessingMode processingMode;
private final TopologyVersion version;
private final TaskExecutionMetadata taskExecutionMetadata;
+ private final Set<String> pausedTopologies;
private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
@@ -104,6 +105,7 @@ public class TopologyMetadata {
this.processingMode = StreamsConfigUtils.processingMode(config);
this.config = config;
this.log = LoggerFactory.getLogger(getClass());
+ this.pausedTopologies = ConcurrentHashMap.newKeySet();
builders = new ConcurrentSkipListMap<>();
if (builder.hasNamedTopology()) {
@@ -111,7 +113,7 @@ public class TopologyMetadata {
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
- this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
+ this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies);
}
public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders,
@@ -120,12 +122,13 @@ public class TopologyMetadata {
this.processingMode = StreamsConfigUtils.processingMode(config);
this.config = config;
this.log = LoggerFactory.getLogger(getClass());
+ this.pausedTopologies = ConcurrentHashMap.newKeySet();
this.builders = builders;
if (builders.isEmpty()) {
log.info("Created an empty KafkaStreams app with no topology");
}
- this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
+ this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies);
}
// Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix
@@ -257,6 +260,35 @@ public class TopologyMetadata {
}
}
+ /**
+ * Pauses a topology by name
+ * @param topologyName Name of the topology to pause
+ */
+ public void pauseTopology(final String topologyName) {
+ pausedTopologies.add(topologyName);
+ }
+
+ /**
+ * Checks if a given topology is paused.
+ * @param topologyName If null, assume that we are checking the `UNNAMED_TOPOLOGY`.
+ * @return A boolean indicating if the topology is paused.
+ */
+ public boolean isPaused(final String topologyName) {
+ if (topologyName == null) {
+ return pausedTopologies.contains(UNNAMED_TOPOLOGY);
+ } else {
+ return pausedTopologies.contains(topologyName);
+ }
+ }
+
+ /**
+ * Resumes a topology by name
+ * @param topologyName Name of the topology to resume
+ */
+ public void resumeTopology(final String topologyName) {
+ pausedTopologies.remove(topologyName);
+ }
+
/**
* Removes the topology and registers a future that listens for all threads on the older version to see the update
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 0e29f4d694..3d22c58337 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -253,6 +253,31 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
}
}
+ /**
+ * Pauses a topology by name
+ * @param topologyName Name of the topology to pause
+ */
+ public void pauseNamedTopology(final String topologyName) {
+ topologyMetadata.pauseTopology(topologyName);
+ }
+
+ /**
+ * Checks if a given topology is paused.
+ * @param topologyName If null, assume that we are checking the `UNNAMED_TOPOLOGY`.
+ * @return A boolean indicating if the topology is paused.
+ */
+ public boolean isNamedTopologyPaused(final String topologyName) {
+ return topologyMetadata.isPaused(topologyName);
+ }
+
+ /**
+ * Resumes a topology by name
+ * @param topologyName Name of the topology to resume
+ */
+ public void resumeNamedTopology(final String topologyName) {
+ topologyMetadata.resumeTopology(topologyName);
+ }
+
/**
* @return true iff the application is still in CREATED and the future was completed
*/
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 95574a16ca..b0f0647731 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -55,8 +55,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -91,19 +91,18 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ExecutionException;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
-
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
@@ -603,6 +602,45 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void testPauseResume() {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ streams.pause();
+ Assert.assertTrue(streams.isPaused());
+ streams.resume();
+ Assert.assertFalse(streams.isPaused());
+ }
+ }
+
+ @Test
+ public void testStartingPaused() {
+ // This test shows that a KafkaStreams instance can be started "paused"
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.pause();
+ streams.start();
+ Assert.assertTrue(streams.isPaused());
+ streams.resume();
+ Assert.assertFalse(streams.isPaused());
+ }
+ }
+
+ @Test
+ public void testShowPauseResumeAreIdempotent() {
+ // This test shows that a KafkaStreams instance can be started "paused"
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ streams.pause();
+ Assert.assertTrue(streams.isPaused());
+ streams.pause();
+ Assert.assertTrue(streams.isPaused());
+ streams.resume();
+ Assert.assertFalse(streams.isPaused());
+ streams.resume();
+ Assert.assertFalse(streams.isPaused());
+ }
+ }
+
@Test
public void shouldAddThreadWhenRunning() throws InterruptedException {
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
@@ -791,6 +829,24 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void shouldThrowOnCleanupWhilePaused() throws InterruptedException {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ waitForCondition(
+ () -> streams.state() == KafkaStreams.State.RUNNING,
+ "Streams never started.");
+
+ streams.pause();
+ waitForCondition(
+ streams::isPaused,
+ "Streams did not pause.");
+
+ assertThrows("Cannot clean up while running.", IllegalStateException.class,
+ streams::cleanUp);
+ }
+ }
+
@Test
public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
new file mode 100644
index 0000000000..e09ba997b2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+ private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+ private static Properties producerConfig;
+ private static Properties consumerConfig;
+
+ private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE =
+ Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+ private static final String INPUT_STREAM_1 = "input-stream-1";
+ private static final String INPUT_STREAM_2 = "input-stream-2";
+ private static final String OUTPUT_STREAM_1 = "output-stream-1";
+ private static final String OUTPUT_STREAM_2 = "output-stream-2";
+ private static final String TOPOLOGY1 = "topology1";
+ private static final String TOPOLOGY2 = "topology2";
+
+ private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
+ asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
+ private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
+ asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L));
+ private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 =
+ asList(pair("A", 3L), pair("B", 2L), pair("A", 4L), pair("C", 3L), pair("C", 4L));
+ private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA_ALL = new ArrayList<KeyValue<String, Long>>() {{
+ addAll(COUNT_OUTPUT_DATA);
+ addAll(COUNT_OUTPUT_DATA2);
+ }};
+
+ private String appId;
+ private KafkaStreams kafkaStreams, kafkaStreams2;
+ private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ @BeforeClass
+ public static void startCluster() throws Exception {
+ CLUSTER.start();
+ producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class, LongSerializer.class);
+ consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
+ StringDeserializer.class, LongDeserializer.class);
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @Before
+ public void createTopics() throws InterruptedException {
+ cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2);
+ appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+ }
+
+ private Properties props() {
+ final Properties properties = new Properties();
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+ properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+ properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+ return properties;
+ }
+
+ @After
+ public void shutdown() throws InterruptedException {
+ for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) {
+ if (streams != null) {
+ streams.close(Duration.ofSeconds(30));
+ }
+ }
+ }
+
+ private static void produceToInputTopics(final String topic, final Collection<KeyValue<String, Long>> records) {
+ IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
+ }
+
+ @Test
+ public void shouldPauseAndResumeKafkaStreams() throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+ kafkaStreams.start();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+ kafkaStreams.pause();
+ assertTrue(kafkaStreams.isPaused());
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+ waitUntilStreamsHasPolled(kafkaStreams, 2);
+ assertTopicSize(OUTPUT_STREAM_1, 5);
+
+ kafkaStreams.resume();
+ assertFalse(kafkaStreams.isPaused());
+
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
+ assertTopicSize(OUTPUT_STREAM_1, 10);
+ }
+
+ @Test
+ public void shouldAllowForTopologiesToStartPaused() throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+ kafkaStreams.pause();
+ kafkaStreams.start();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+ assertTrue(kafkaStreams.isPaused());
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+ waitUntilStreamsHasPolled(kafkaStreams, 2);
+ assertTopicSize(OUTPUT_STREAM_1, 0);
+
+ kafkaStreams.resume();
+ assertFalse(kafkaStreams.isPaused());
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+ assertTopicSize(OUTPUT_STREAM_1, 5);
+ }
+
+ @Test
+ public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+ final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+ final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+ streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build()));
+ waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT);
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+ awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
+ assertTopicSize(OUTPUT_STREAM_1, 5);
+ assertTopicSize(OUTPUT_STREAM_2, 5);
+
+ streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+ assertFalse(streamsNamedTopologyWrapper.isPaused());
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+ awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA2);
+ assertTopicSize(OUTPUT_STREAM_1, 5);
+ assertTopicSize(OUTPUT_STREAM_2, 10);
+
+ streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+ assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
+ }
+
+ @Test
+ public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+ final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+ final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+ streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build()));
+ waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT);
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+ awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
+
+ streamsNamedTopologyWrapper.pause();
+ assertTrue(streamsNamedTopologyWrapper.isPaused());
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+ waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+ assertTopicSize(OUTPUT_STREAM_1, 5);
+ assertTopicSize(OUTPUT_STREAM_2, 5);
+
+ streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+ assertFalse(streamsNamedTopologyWrapper.isPaused());
+ assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
+ assertTopicSize(OUTPUT_STREAM_1, 10);
+ assertTopicSize(OUTPUT_STREAM_2, 5);
+ }
+
+ @Test
+ public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
+ final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+ final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+ streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+ streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build()));
+ waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT);
+
+ assertFalse(streamsNamedTopologyWrapper.isPaused());
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+ awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
+ assertTopicSize(OUTPUT_STREAM_1, 0);
+
+ streamsNamedTopologyWrapper.pause();
+ assertTrue(streamsNamedTopologyWrapper.isPaused());
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+ produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+ waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+ assertTopicSize(OUTPUT_STREAM_1, 0);
+ assertTopicSize(OUTPUT_STREAM_2, 5);
+
+ streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+ assertFalse(streamsNamedTopologyWrapper.isPaused());
+ assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+ assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+ awaitOutput(OUTPUT_STREAM_1, 10, COUNT_OUTPUT_DATA_ALL);
+ assertTopicSize(OUTPUT_STREAM_1, 10);
+ assertTopicSize(OUTPUT_STREAM_2, 5);
+ }
+
+ @Test
+ public void pauseResumeShouldWorkAcrossInstances() throws Exception {
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+ kafkaStreams.pause();
+ kafkaStreams.start();
+
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+ assertTrue(kafkaStreams.isPaused());
+
+ kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
+ kafkaStreams2.pause();
+ kafkaStreams2.start();
+ waitForApplicationState(singletonList(kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
+ assertTrue(kafkaStreams2.isPaused());
+
+ waitUntilStreamsHasPolled(kafkaStreams, 2);
+ waitUntilStreamsHasPolled(kafkaStreams2, 2);
+ assertTopicSize(OUTPUT_STREAM_1, 0);
+
+ kafkaStreams2.close();
+ kafkaStreams2.cleanUp();
+ waitForApplicationState(singletonList(kafkaStreams2), State.NOT_RUNNING, STARTUP_TIMEOUT);
+
+ kafkaStreams.resume();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+ }
+
+ @Test
+ public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+ final Properties properties1 = props();
+ properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ final Properties properties2 = props();
+ properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, properties1);
+ kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1, properties2);
+ kafkaStreams.start();
+ kafkaStreams2.start();
+
+ waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
+
+ awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+ kafkaStreams.close();
+ kafkaStreams2.close();
+
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, properties1);
+ kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1, properties2);
+ kafkaStreams.cleanUp();
+ kafkaStreams2.cleanUp();
+
+ kafkaStreams.pause();
+ kafkaStreams2.pause();
+ kafkaStreams.start();
+ kafkaStreams2.start();
+
+ waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT);
+
+ assertStreamsLocalStoreLagStaysConstant(kafkaStreams);
+ assertStreamsLocalStoreLagStaysConstant(kafkaStreams2);
+ }
+
+ private void assertStreamsLocalStoreLagStaysConstant(final KafkaStreams streams) throws InterruptedException {
+ waitForCondition(
+ () -> !streams.allLocalStorePartitionLags().isEmpty(),
+ "Lags for local store partitions were not found within the timeout!");
+ waitUntilStreamsHasPolled(streams, 2);
+ final long stateStoreLag1 = streams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
+ waitUntilStreamsHasPolled(streams, 2);
+ final long stateStoreLag2 = streams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
+ assertTrue(stateStoreLag1 > 0);
+ assertEquals(stateStoreLag1, stateStoreLag2);
+ }
+
+ private KafkaStreams buildKafkaStreams(final String outputTopic) {
+ return buildKafkaStreams(outputTopic, props());
+ }
+
+ private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(INPUT_STREAM_1).groupByKey().count(Materialized.as("test-store")).toStream().to(outputTopic);
+ return new KafkaStreams(builder.build(properties), properties);
+ }
+
+ private void assertTopicSize(final String topicName, final int size) {
+ assertEquals(getTopicSize(consumerConfig, topicName), size);
+ }
+
+ private void awaitOutput(final String topicName, final int count, final List<KeyValue<String, Long>> output)
+ throws Exception {
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, topicName, count), CoreMatchers.equalTo(output));
+ }
+
+ private NamedTopologyBuilder getNamedTopologyBuilder1() {
+ final NamedTopologyBuilder builder1 = streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY1);
+ builder1.stream(INPUT_STREAM_1).groupByKey().count().toStream().to(OUTPUT_STREAM_1);
+ return builder1;
+ }
+
+ private NamedTopologyBuilder getNamedTopologyBuilder2() {
+ final NamedTopologyBuilder builder2 = streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY2);
+ builder2.stream(INPUT_STREAM_2)
+ .groupBy((k, v) -> k)
+ .count(IN_MEMORY_STORE)
+ .toStream()
+ .to(OUTPUT_STREAM_2);
+ return builder2;
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index a5de09bbf6..31baae96ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1387,6 +1387,41 @@ public class IntegrationTestUtils {
}
}
+ public static long getTopicSize(final Properties consumerConfig, final String topicName) {
+ long sum = 0;
+ try (final Consumer<Object, Object> consumer = createConsumer(consumerConfig)) {
+ final Collection<TopicPartition> partitions = consumer.partitionsFor(topicName)
+ .stream()
+ .map(info -> new TopicPartition(topicName, info.partition()))
+ .collect(Collectors.toList());
+ final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
+ final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
+
+ for (final TopicPartition partition : beginningOffsets.keySet()) {
+ sum += endOffsets.get(partition) - beginningOffsets.get(partition);
+ }
+ }
+ return sum;
+ }
+
+ private static Double getStreamsPollNumber(final KafkaStreams kafkaStreams) {
+ return (Double) kafkaStreams.metrics()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().name().equals("poll-total"))
+ .findFirst().get()
+ .getValue()
+ .metricValue();
+ }
+
+ public static void waitUntilStreamsHasPolled(final KafkaStreams kafkaStreams, final int pollNumber)
+ throws InterruptedException {
+ final Double initialCount = getStreamsPollNumber(kafkaStreams);
+ retryOnExceptionWithTimeout(1000, () -> {
+ assertThat(getStreamsPollNumber(kafkaStreams), is(greaterThanOrEqualTo(initialCount + pollNumber)));
+ });
+ }
+
public static class StableAssignmentListener implements AssignmentListener {
final AtomicInteger numStableAssignments = new AtomicInteger(0);
int nextExpectedNumStableAssignments;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 594fc7e842..a88493ea0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -1092,7 +1092,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId);
+ EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
EasyMock.replay(activeStateManager, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 51151b4f40..74deafd32b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -83,8 +83,6 @@ import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
-
-import java.util.function.BiConsumer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@@ -92,6 +90,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import java.io.File;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -109,6 +108,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
@@ -121,8 +121,8 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
@@ -149,6 +149,8 @@ public class StreamThreadTest {
private final static String APPLICATION_ID = "stream-thread-test";
private final static UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d");
private final static String CLIENT_ID = APPLICATION_ID + "-" + PROCESS_ID;
+ public static final String STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG = "stream-thread-test-count-one-changelog";
+ public static final String STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG = "stream-thread-test-table-two-changelog";
private final int threadIdx = 1;
private final Metrics metrics = new Metrics();
@@ -1642,9 +1644,9 @@ public class StreamThreadTest {
final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(
- "stream-thread-test-count-one-changelog",
+ STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
Collections.singletonList(
- new PartitionInfo("stream-thread-test-count-one-changelog",
+ new PartitionInfo(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
0,
null,
new Node[0],
@@ -1653,7 +1655,7 @@ public class StreamThreadTest {
);
final HashMap<TopicPartition, Long> offsets = new HashMap<>();
- offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
+ offsets.put(new TopicPartition(STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, 1), 0L);
restoreConsumer.updateEndOffsets(offsets);
restoreConsumer.updateBeginningOffsets(offsets);
@@ -1686,8 +1688,74 @@ public class StreamThreadTest {
final String storeName2 = "table-two";
final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog";
final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog";
+ final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
+ final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+
+ setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, false);
+
+ thread.runOnce();
+
+ final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1);
+ final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1);
+ assertEquals(task1, standbyTask1.id());
+ assertEquals(task3, standbyTask2.id());
+
+ final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
+ final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
+
+ assertEquals(0L, store1.approximateNumEntries());
+ assertEquals(0L, store2.approximateNumEntries());
+
+ addStandbyRecordsToRestoreConsumer(restoreConsumer);
+
+ thread.runOnce();
+
+ assertEquals(10L, store1.approximateNumEntries());
+ assertEquals(4L, store2.approximateNumEntries());
+
+ thread.taskManager().shutdown(true);
+ }
+
+ private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) {
+ for (long i = 0L; i < 10L; i++) {
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
+ 2,
+ i,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
+ }
+ }
+
+ private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) {
+ // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10
+ for (long i = 0L; i < 10L; i++) {
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
+ 1,
+ i,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG,
+ 1,
+ i,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
+ }
+ }
+
+ private void setupThread(final String storeName1,
+ final String storeName2,
+ final String changelogName1,
+ final String changelogName2,
+ final StreamThread thread,
+ final MockConsumer<byte[], byte[]> restoreConsumer,
+ final boolean addActiveTask) throws IOException {
+ final TopicPartition activePartition = new TopicPartition(changelogName1, 2);
final TopicPartition partition1 = new TopicPartition(changelogName1, 1);
final TopicPartition partition2 = new TopicPartition(changelogName2, 1);
+
internalStreamsBuilder
.stream(Collections.singleton(topic1), consumed)
.groupByKey()
@@ -1697,12 +1765,15 @@ public class StreamThreadTest {
internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), materialized);
internalStreamsBuilder.buildAndOptimizeTopology();
- final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
- final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(changelogName1,
Collections.singletonList(new PartitionInfo(changelogName1, 1, null, new Node[0], new Node[0]))
);
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(activePartition, 10L));
+ restoreConsumer.updateBeginningOffsets(Collections.singletonMap(activePartition, 0L));
+ ((MockAdminClient) (thread.adminClient())).updateBeginningOffsets(Collections.singletonMap(activePartition, 0L));
+ ((MockAdminClient) (thread.adminClient())).updateEndOffsets(Collections.singletonMap(activePartition, 10L));
+
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
@@ -1714,47 +1785,75 @@ public class StreamThreadTest {
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+ if (addActiveTask) {
+ activeTasks.put(task2, Collections.singleton(t1p2));
+ }
+
// assign single partition
standbyTasks.put(task1, Collections.singleton(t1p1));
standbyTasks.put(task3, Collections.singleton(t2p1));
- thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
+ thread.taskManager().handleAssignment(activeTasks, standbyTasks);
thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), null);
thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception {
+ final String storeName1 = "count-one";
+ final String storeName2 = "table-two";
+ final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog";
+ final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog";
+ final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
+ final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+
+ setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, true);
thread.runOnce();
+ final StreamTask activeTask1 = activeTask(thread.taskManager(), t1p2);
final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1);
final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1);
assertEquals(task1, standbyTask1.id());
assertEquals(task3, standbyTask2.id());
+ final KeyValueStore<Object, Long> activeStore = (KeyValueStore<Object, Long>) activeTask1.getStore(storeName1);
+
final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
+
+ assertEquals(0L, activeStore.approximateNumEntries());
assertEquals(0L, store1.approximateNumEntries());
assertEquals(0L, store2.approximateNumEntries());
+ // Add some records that the active task would handle
+ addActiveRecordsToRestoreConsumer(restoreConsumer);
// let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10
- for (long i = 0L; i < 10L; i++) {
- restoreConsumer.addRecord(new ConsumerRecord<>(
- changelogName1,
- 1,
- i,
- ("K" + i).getBytes(),
- ("V" + i).getBytes()));
- restoreConsumer.addRecord(new ConsumerRecord<>(
- changelogName2,
- 1,
- i,
- ("K" + i).getBytes(),
- ("V" + i).getBytes()));
- }
+ addStandbyRecordsToRestoreConsumer(restoreConsumer);
+ // Simulate pause
+ thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
thread.runOnce();
+ assertEquals(0L, activeStore.approximateNumEntries());
+ assertEquals(0L, store1.approximateNumEntries());
+ assertEquals(0L, store2.approximateNumEntries());
+
+ // Simulate resume
+ thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
+ thread.runOnce();
+
+ assertEquals(10L, activeStore.approximateNumEntries());
+ assertEquals(0L, store1.approximateNumEntries());
+ assertEquals(0L, store2.approximateNumEntries());
+
+ thread.runOnce();
+ assertEquals(10L, activeStore.approximateNumEntries());
assertEquals(10L, store1.approximateNumEntries());
assertEquals(4L, store2.approximateNumEntries());
@@ -3108,6 +3207,15 @@ public class StreamThreadTest {
Optional.empty()));
}
+ StreamTask activeTask(final TaskManager taskManager, final TopicPartition partition) {
+ final Stream<Task> standbys = taskManager.tasks().values().stream().filter(t -> t.isActive());
+ for (final Task task : (Iterable<Task>) standbys::iterator) {
+ if (task.inputPartitions().contains(partition)) {
+ return (StreamTask) task;
+ }
+ }
+ return null;
+ }
StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) {
final Stream<Task> standbys = taskManager.tasks().values().stream().filter(t -> !t.isActive());
for (final Task task : (Iterable<Task>) standbys::iterator) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java
new file mode 100644
index 0000000000..7fcb5e30b4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TaskExecutionMetadataTest {
+ final static String TOPOLOGY1 = "topology1";
+ final static String TOPOLOGY2 = "topology2";
+ final static Set<String> NAMED_TOPOLOGIES = new HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2));
+ final static int TIME_ZERO = 0;
+ final static int CONSTANT_BACKOFF_MS = 5000;
+
+ @Test
+ public void testCanProcessWithoutNamedTopologies() {
+ final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
+ final Set<String> pausedTopologies = new HashSet<>();
+
+ final TaskExecutionMetadata metadata = new TaskExecutionMetadata(topologies, pausedTopologies);
+
+ final Task mockTask = createMockTask(UNNAMED_TOPOLOGY);
+
+ Assert.assertTrue(metadata.canProcessTask(mockTask, TIME_ZERO));
+ // This pauses an UNNAMED_TOPOLOGY / a KafkaStreams instance without named/modular
+ // topologies.
+ pausedTopologies.add(UNNAMED_TOPOLOGY);
+ Assert.assertFalse(metadata.canProcessTask(mockTask, TIME_ZERO));
+ }
+
+ @Test
+ public void testNamedTopologiesCanBePausedIndependently() {
+ final Set<String> pausedTopologies = new HashSet<>();
+ final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, pausedTopologies);
+
+ final Task mockTask1 = createMockTask(TOPOLOGY1);
+ final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+ Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+ pausedTopologies.add(TOPOLOGY1);
+ Assert.assertFalse(metadata.canProcessTask(mockTask1, TIME_ZERO));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+ pausedTopologies.remove(TOPOLOGY1);
+ Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+ }
+
+ @Test
+ public void testNamedTopologiesCanBeStartedPaused() {
+ final Set<String> pausedTopologies = new HashSet<>();
+ pausedTopologies.add(TOPOLOGY1);
+
+ final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES,
+ pausedTopologies);
+
+ final Task mockTask1 = createMockTask(TOPOLOGY1);
+ final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+ Assert.assertFalse(metadata.canProcessTask(mockTask1, TIME_ZERO));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+ pausedTopologies.remove(TOPOLOGY1);
+ Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+ }
+
+ @Test
+ public void testNamedTopologiesCanBackoff() {
+ final Set<String> pausedTopologies = new HashSet<>();
+
+ final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES,
+ pausedTopologies);
+
+ final Task mockTask1 = createMockTask(TOPOLOGY1);
+ final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+ Assert.assertTrue(metadata.canProcessTask(mockTask1, TIME_ZERO));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, TIME_ZERO));
+
+ metadata.registerTaskError(mockTask1, new Throwable("Error"), TIME_ZERO);
+ Assert.assertFalse(metadata.canProcessTask(mockTask1, CONSTANT_BACKOFF_MS - 1));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, CONSTANT_BACKOFF_MS - 1));
+
+ Assert.assertFalse(metadata.canProcessTask(mockTask1, CONSTANT_BACKOFF_MS));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, CONSTANT_BACKOFF_MS));
+
+ Assert.assertTrue(metadata.canProcessTask(mockTask1, CONSTANT_BACKOFF_MS + 1));
+ Assert.assertTrue(metadata.canProcessTask(mockTask2, CONSTANT_BACKOFF_MS + 1));
+ }
+
+ private static Task createMockTask(final String topologyName) {
+ final Task mockTask = mock(Task.class);
+ final TaskId taskId = new TaskId(0, 0, topologyName);
+ when(mockTask.id()).thenReturn(taskId);
+ return mockTask;
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
new file mode 100644
index 0000000000..a44970238a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class TaskExecutorTest {
+ @Test
+ public void testPunctuateWithPause() {
+ final Tasks tasks = mock(Tasks.class);
+ final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+
+ final TaskExecutor taskExecutor =
+ new TaskExecutor(tasks, metadata, ProcessingMode.AT_LEAST_ONCE, false, new LogContext());
+
+ taskExecutor.punctuate();
+ verify(tasks).notPausedActiveTasks();
+ verify(tasks, never()).notPausedTasks();
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
new file mode 100644
index 0000000000..ad701f8ca4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TasksTest {
+ @Test
+ public void testNotPausedTasks() {
+ final TopologyMetadata topologyMetadata = mock(TopologyMetadata.class);
+ final String unnamedTopologyName = null;
+ when(topologyMetadata.isPaused(unnamedTopologyName))
+ .thenReturn(false)
+ .thenReturn(false).thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(true).thenReturn(true);
+
+ final Tasks tasks = new Tasks(
+ new LogContext(),
+ topologyMetadata,
+ mock(ActiveTaskCreator.class),
+ mock(StandbyTaskCreator.class)
+ );
+
+ final TaskId taskId1 = new TaskId(0, 1);
+ final TaskId taskId2 = new TaskId(0, 2);
+
+ final StreamTask streamTask = mock(StreamTask.class);
+ when(streamTask.isActive()).thenReturn(true);
+ when(streamTask.id()).thenReturn(taskId1);
+
+ final StandbyTask standbyTask1 = mock(StandbyTask.class);
+ when(standbyTask1.isActive()).thenReturn(false);
+ when(standbyTask1.id()).thenReturn(taskId2);
+
+ tasks.addTask(streamTask);
+ tasks.addTask(standbyTask1);
+ Assert.assertEquals(tasks.notPausedActiveTasks().size(), 1);
+ Assert.assertEquals(tasks.notPausedTasks().size(), 2);
+
+ Assert.assertEquals(tasks.notPausedActiveTasks().size(), 0);
+ Assert.assertEquals(tasks.notPausedTasks().size(), 0);
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TopologyMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TopologyMetadataTest.java
new file mode 100644
index 0000000000..52a103a4d4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TopologyMetadataTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.mock;
+
+public class TopologyMetadataTest {
+ final static String TOPOLOGY1 = "topology1";
+ final static String TOPOLOGY2 = "topology2";
+
+ @Test
+ public void testPauseResume() {
+ final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class);
+ final StreamsConfig config = new DummyStreamsConfig();
+
+ final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder,
+ config);
+
+ Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY1));
+ Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY2));
+
+ topologyMetadata.pauseTopology(TOPOLOGY1);
+ Assert.assertTrue(topologyMetadata.isPaused(TOPOLOGY1));
+ Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY2));
+
+ topologyMetadata.resumeTopology(TOPOLOGY1);
+ Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY1));
+ Assert.assertFalse(topologyMetadata.isPaused(TOPOLOGY2));
+ }
+}