You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/07/18 23:43:01 UTC
[kafka] branch trunk updated: KAFKA-10199: Add PAUSE in state updater (#12386)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386)
309e0f986e is described below
commit 309e0f986e97be966c797f7729eb1e94ef5400a9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Jul 18 16:42:48 2022 -0700
KAFKA-10199: Add PAUSE in state updater (#12386)
* Add pause action to task-updater.
* When removing a task, also check in the paused tasks in addition to removed tasks.
* Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well.
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../processor/internals/DefaultStateUpdater.java | 62 ++++-
.../streams/processor/internals/StateUpdater.java | 13 +
.../processor/internals/StoreChangelogReader.java | 2 +-
.../streams/processor/internals/TaskAndAction.java | 10 +-
.../internals/DefaultStateUpdaterTest.java | 282 ++++++++++++++++++++-
.../processor/internals/TaskAndActionTest.java | 20 ++
6 files changed, 379 insertions(+), 10 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 22fd48a4ab..08959bee00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -83,7 +83,7 @@ public class DefaultStateUpdater implements StateUpdater {
}
public boolean onlyStandbyTasksLeft() {
- return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive());
+ return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive);
}
@Override
@@ -125,6 +125,9 @@ public class DefaultStateUpdater implements StateUpdater {
case REMOVE:
removeTask(taskAndAction.getTaskId());
break;
+ case PAUSE:
+ pauseTask(taskAndAction.getTaskId());
+ break;
}
}
} finally {
@@ -243,7 +246,12 @@ public class DefaultStateUpdater implements StateUpdater {
addToRestoredTasks((StreamTask) task);
log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
} else {
- updatingTasks.put(task.id(), task);
+ final Task existingTask = updatingTasks.putIfAbsent(task.id(), task);
+ if (existingTask != null) {
+ throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " +
+ "should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+ }
+
if (task.isActive()) {
log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater");
changelogReader.enforceRestoreActive();
@@ -257,8 +265,9 @@ public class DefaultStateUpdater implements StateUpdater {
}
private void removeTask(final TaskId taskId) {
- final Task task = updatingTasks.get(taskId);
- if (task != null) {
+ final Task task;
+ if (updatingTasks.containsKey(taskId)) {
+ task = updatingTasks.get(taskId);
task.maybeCheckpoint(true);
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
@@ -267,8 +276,31 @@ public class DefaultStateUpdater implements StateUpdater {
transitToUpdateStandbysIfOnlyStandbysLeft();
log.debug((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+ } else if (pausedTasks.containsKey(taskId)) {
+ task = pausedTasks.get(taskId);
+ final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+ changelogReader.unregister(changelogPartitions);
+ removedTasks.add(task);
+ pausedTasks.remove(taskId);
+ log.debug((task.isActive() ? "Active" : "Standby")
+ + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
+ } else {
+ log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
+ }
+ }
+
+ private void pauseTask(final TaskId taskId) {
+ final Task task = updatingTasks.get(taskId);
+ if (task != null) {
+ // do not need to unregister changelog partitions for paused tasks
+ task.maybeCheckpoint(true);
+ pausedTasks.put(taskId, task);
+ updatingTasks.remove(taskId);
+ transitToUpdateStandbysIfOnlyStandbysLeft();
+ log.debug((task.isActive() ? "Active" : "Standby")
+ + " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
} else {
- log.debug("Task " + taskId + " was not removed since it is not updating.");
+ log.debug("Task " + taskId + " was not paused since it is not updating.");
}
}
@@ -333,6 +365,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>();
private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
+ private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>();
private final long commitIntervalMs;
private long lastCommitMs;
@@ -407,6 +440,17 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
+ @Override
+ public void pause(final TaskId taskId) {
+ tasksAndActionsLock.lock();
+ try {
+ tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
+ tasksAndActionsCondition.signalAll();
+ } finally {
+ tasksAndActionsLock.unlock();
+ }
+ }
+
@Override
public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
final long timeoutMs = timeout.toMillis();
@@ -478,6 +522,10 @@ public class DefaultStateUpdater implements StateUpdater {
return Collections.unmodifiableSet(new HashSet<>(removedTasks));
}
+ public Set<Task> getPausedTasks() {
+ return Collections.unmodifiableSet(new HashSet<>(pausedTasks.values()));
+ }
+
@Override
public Set<Task> getTasks() {
return executeWithQueuesLocked(() -> getStreamOfTasks().collect(Collectors.toSet()));
@@ -520,6 +568,8 @@ public class DefaultStateUpdater implements StateUpdater {
restoredActiveTasks.stream(),
Stream.concat(
exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> exceptionAndTasks.getTasks().stream()),
- removedTasks.stream()))));
+ Stream.concat(
+ getPausedTasks().stream(),
+ removedTasks.stream())))));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 1b229bc818..516e47436b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -94,6 +94,19 @@ public interface StateUpdater {
*/
void remove(final TaskId taskId);
+ /**
+ * Pause a task (active or standby) from restoring in the state updater.
+ *
+ * This method does not block until the task is paused.
+ *
+ * Restored tasks, removed tasks and failed tasks are not paused so this action would be an no-op for them.
+ * Stateless tasks will never be paused since they are immediately added to the
+ * restored active tasks.
+ *
+ * @param taskId ID of the task to remove
+ */
+ void pause(final TaskId taskId);
+
/**
* Drains the restored active tasks from the state updater.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5240534ce7..f8926e70bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -481,7 +481,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void pauseResumePartitions(final Map<TaskId, Task> tasks,
- final Set<TopicPartition> restoringChangelogs) {
+ final Set<TopicPartition> restoringChangelogs) {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
index 4c4316a864..585374c339 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
@@ -24,7 +24,8 @@ public class TaskAndAction {
enum Action {
ADD,
- REMOVE
+ REMOVE,
+ PAUSE
}
private final Task task;
@@ -47,6 +48,11 @@ public class TaskAndAction {
return new TaskAndAction(null, taskId, Action.REMOVE);
}
+ public static TaskAndAction createPauseTask(final TaskId taskId) {
+ Objects.requireNonNull(taskId, "Task ID of task to pause is null!");
+ return new TaskAndAction(null, taskId, Action.PAUSE);
+ }
+
public Task getTask() {
if (action != Action.ADD) {
throw new IllegalStateException("Action type " + action + " cannot have a task!");
@@ -55,7 +61,7 @@ public class TaskAndAction {
}
public TaskId getTaskId() {
- if (action != Action.REMOVE) {
+ if (action != Action.REMOVE && action != Action.PAUSE) {
throw new IllegalStateException("Action type " + action + " cannot have a task ID!");
}
return taskId;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 465ae4a1c5..14b8237fe7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -83,7 +83,6 @@ class DefaultStateUpdaterTest {
private final Time time = new MockTime(1L);
private final StreamsConfig config = new StreamsConfig(configProps());
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
- private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
@AfterEach
@@ -152,6 +151,42 @@ class DefaultStateUpdaterTest {
}
}
+ @Test
+ public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldThrowIfAddingTasksWithSameId(task1, task2);
+ }
+
+ @Test
+ public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+ final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ shouldThrowIfAddingTasksWithSameId(task1, task2);
+ }
+
+ @Test
+ public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ shouldThrowIfAddingTasksWithSameId(task1, task2);
+ }
+
+ @Test
+ public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ shouldThrowIfAddingTasksWithSameId(task2, task1);
+ }
+
+ private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task task2) throws Exception {
+ stateUpdater.start();
+ stateUpdater.add(task1);
+ stateUpdater.add(task2);
+
+ verifyFailedTasks(IllegalStateException.class, task1);
+ }
+
@Test
public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
@@ -177,6 +212,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
}
@Test
@@ -200,6 +236,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
verify(changelogReader, times(1)).enforceRestoreActive();
verify(changelogReader, atLeast(3)).restore(anyMap());
verify(changelogReader, never()).transitToUpdateStandby();
@@ -231,6 +268,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
verify(changelogReader, times(3)).enforceRestoreActive();
verify(changelogReader, atLeast(4)).restore(anyMap());
verify(changelogReader, never()).transitToUpdateStandby();
@@ -286,6 +324,7 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
verify(changelogReader, times(1)).transitToUpdateStandby();
verify(changelogReader, timeout(VERIFICATION_TIMEOUT).atLeast(1)).restore(anyMap());
verify(changelogReader, never()).enforceRestoreActive();
@@ -314,6 +353,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingStandbyTasks(task4, task3);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
verify(changelogReader, atLeast(3)).restore(anyMap());
final InOrder orderVerifier = inOrder(changelogReader, task1, task2);
orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive();
@@ -424,10 +464,37 @@ class DefaultStateUpdaterTest {
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
+ verifyPausedTasks();
verifyExceptionsAndFailedTasks();
verify(changelogReader).unregister(task.changelogPartitions());
}
+ @Test
+ public void shouldRemovePausedTask() throws Exception {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
+
+ stateUpdater.start();
+ stateUpdater.add(task1);
+ stateUpdater.add(task2);
+
+ stateUpdater.pause(task1.id());
+ stateUpdater.pause(task2.id());
+
+ verifyPausedTasks(task1, task2);
+ verifyRemovedTasks();
+ verifyUpdatingTasks();
+
+ stateUpdater.remove(task1.id());
+ stateUpdater.remove(task2.id());
+
+ verifyRemovedTasks(task1, task2);
+ verifyPausedTasks();
+ verifyCheckpointTasks(true, task1, task2);
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
@@ -455,6 +522,7 @@ class DefaultStateUpdaterTest {
verifyRemovedTasks(controlTask);
verifyRestoredActiveTasks(task);
verifyUpdatingTasks();
+ verifyPausedTasks();
verifyExceptionsAndFailedTasks();
}
@@ -493,11 +561,168 @@ class DefaultStateUpdaterTest {
stateUpdater.remove(controlTask.id());
verifyRemovedTasks(controlTask);
+ verifyPausedTasks();
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyUpdatingTasks();
verifyRestoredActiveTasks();
}
+ @Test
+ public void shouldPauseActiveStatefulTask() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldPauseStatefulTask(task);
+ verify(changelogReader, never()).transitToUpdateStandby();
+ }
+
+ @Test
+ public void shouldPauseStandbyTask() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldPauseStatefulTask(task);
+ verify(changelogReader, times(1)).transitToUpdateStandby();
+ }
+
+ @Test
+ public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
+
+ stateUpdater.start();
+ stateUpdater.add(task1);
+ stateUpdater.add(task2);
+
+ stateUpdater.pause(task1.id());
+
+ verifyPausedTasks(task1);
+ verifyCheckpointTasks(true, task1);
+ verifyRestoredActiveTasks();
+ verifyRemovedTasks();
+ verifyUpdatingTasks(task2);
+ verifyExceptionsAndFailedTasks();
+ verify(changelogReader, times(1)).enforceRestoreActive();
+ verify(changelogReader, times(1)).transitToUpdateStandby();
+ }
+
+ private void shouldPauseStatefulTask(final Task task) throws Exception {
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ stateUpdater.pause(task.id());
+
+ verifyPausedTasks(task);
+ verifyCheckpointTasks(true, task);
+ verifyRestoredActiveTasks();
+ verifyRemovedTasks();
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldIgnorePausingNotExistTasks() throws Exception {
+ stateUpdater.start();
+ stateUpdater.pause(TASK_0_0);
+
+ verifyPausedTasks();
+ verifyRestoredActiveTasks();
+ verifyRemovedTasks();
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ stateUpdater.start();
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ verifyRestoredActiveTasks(task);
+
+ stateUpdater.pause(task.id());
+ stateUpdater.pause(controlTask.id());
+
+ verifyPausedTasks(controlTask);
+ verifyRestoredActiveTasks(task);
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotPauseTaskInFailedTasks(task);
+ }
+
+ @Test
+ public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotPauseTaskInFailedTasks(task);
+ }
+
+ private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception {
+ final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StreamsException streamsException = new StreamsException("Something happened", task.id());
+ when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ final Map<TaskId, Task> updatingTasks = mkMap(
+ mkEntry(task.id(), task),
+ mkEntry(controlTask.id(), controlTask)
+ );
+ doThrow(streamsException)
+ .doNothing()
+ .when(changelogReader).restore(updatingTasks);
+ stateUpdater.start();
+
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task), streamsException);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+ stateUpdater.pause(task.id());
+ stateUpdater.pause(controlTask.id());
+
+ verifyPausedTasks(controlTask);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+ verifyUpdatingTasks();
+ verifyRestoredActiveTasks();
+ }
+
+ @Test
+ public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotPauseTaskInRemovedTasks(task);
+ }
+
+ @Test
+ public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotPauseTaskInRemovedTasks(task);
+ }
+
+ private void shouldNotPauseTaskInRemovedTasks(final Task task) throws Exception {
+ when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ stateUpdater.remove(task.id());
+
+ verifyRemovedTasks(task);
+ verifyCheckpointTasks(true, task);
+ verifyRestoredActiveTasks();
+ verifyUpdatingTasks();
+ verifyPausedTasks();
+ verifyExceptionsAndFailedTasks();
+ verify(changelogReader).unregister(task.changelogPartitions());
+
+ stateUpdater.pause(task.id());
+
+ verifyRemovedTasks(task);
+ verifyUpdatingTasks();
+ verifyPausedTasks();
+ }
+
@Test
public void shouldDrainRemovedTasks() throws Exception {
assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
@@ -543,6 +768,7 @@ class DefaultStateUpdaterTest {
final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task1, task2), streamsException);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyRemovedTasks();
+ verifyPausedTasks();
verifyUpdatingTasks();
verifyRestoredActiveTasks();
}
@@ -582,6 +808,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks(task2);
verifyRestoredActiveTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
}
@Test
@@ -630,6 +857,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
+ verifyPausedTasks();
}
@Test
@@ -882,6 +1110,22 @@ class DefaultStateUpdaterTest {
verifyGetTasks(mkSet(), mkSet());
}
+ @Test
+ public void shouldGetTasksFromPausedTasks() throws Exception {
+ final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_0));
+ stateUpdater.start();
+ stateUpdater.add(activeTask);
+ stateUpdater.add(standbyTask);
+
+ stateUpdater.pause(activeTask.id());
+ stateUpdater.pause(standbyTask.id());
+
+ verifyPausedTasks(activeTask, standbyTask);
+
+ verifyGetTasks(mkSet(activeTask), mkSet(standbyTask));
+ }
+
private void verifyGetTasks(final Set<StreamTask> expectedActiveTasks,
final Set<StandbyTask> expectedStandbyTasks) {
final Set<Task> tasks = stateUpdater.getTasks();
@@ -983,6 +1227,24 @@ class DefaultStateUpdaterTest {
}
}
+ private void verifyPausedTasks(final Task... tasks) throws Exception {
+ if (tasks.length == 0) {
+ assertTrue(stateUpdater.getPausedTasks().isEmpty());
+ } else {
+ final Set<Task> expectedPausedTasks = mkSet(tasks);
+ final Set<Task> pausedTasks = new HashSet<>();
+ waitForCondition(
+ () -> {
+ pausedTasks.addAll(stateUpdater.getPausedTasks());
+ return pausedTasks.containsAll(expectedPausedTasks)
+ && pausedTasks.size() == expectedPausedTasks.size();
+ },
+ VERIFICATION_TIMEOUT,
+ "Did not get all paused task within the given timeout!"
+ );
+ }
+ }
+
private void verifyDrainingRemovedTasks(final Task... tasks) throws Exception {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
@@ -1012,6 +1274,24 @@ class DefaultStateUpdaterTest {
);
}
+ private void verifyFailedTasks(final Class<? extends RuntimeException> clazz, final Task... tasks) throws Exception {
+ final List<Task> expectedFailedTasks = Arrays.asList(tasks);
+ final Set<Task> failedTasks = new HashSet<>();
+ waitForCondition(
+ () -> {
+ for (final ExceptionAndTasks exceptionsAndTasks : stateUpdater.getExceptionsAndFailedTasks()) {
+ if (clazz.isInstance(exceptionsAndTasks.exception())) {
+ failedTasks.addAll(exceptionsAndTasks.getTasks());
+ }
+ }
+ return failedTasks.containsAll(expectedFailedTasks)
+ && failedTasks.size() == expectedFailedTasks.size();
+ },
+ VERIFICATION_TIMEOUT,
+ "Did not get all exceptions and failed tasks within the given timeout!"
+ );
+ }
+
private void verifyDrainingExceptionsAndFailedTasks(final ExceptionAndTasks... exceptionsAndTasks) throws Exception {
final List<ExceptionAndTasks> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
final List<ExceptionAndTasks> failedTasks = new ArrayList<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
index 39b927ee09..f994ef75c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
@@ -20,8 +20,10 @@ import org.apache.kafka.streams.processor.TaskId;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD;
+import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.PAUSE;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask;
+import static org.apache.kafka.streams.processor.internals.TaskAndAction.createPauseTask;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.createRemoveTask;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -54,6 +56,18 @@ class TaskAndActionTest {
assertEquals("Action type REMOVE cannot have a task!", exception.getMessage());
}
+ @Test
+ public void shouldCreatePauseTaskAction() {
+ final TaskId taskId = new TaskId(0, 0);
+
+ final TaskAndAction pauseTask = createPauseTask(taskId);
+
+ assertEquals(PAUSE, pauseTask.getAction());
+ assertEquals(taskId, pauseTask.getTaskId());
+ final Exception exception = assertThrows(IllegalStateException.class, pauseTask::getTask);
+ assertEquals("Action type PAUSE cannot have a task!", exception.getMessage());
+ }
+
@Test
public void shouldThrowIfAddTaskActionIsCreatedWithNullTask() {
final Exception exception = assertThrows(NullPointerException.class, () -> createAddTask(null));
@@ -65,4 +79,10 @@ class TaskAndActionTest {
final Exception exception = assertThrows(NullPointerException.class, () -> createRemoveTask(null));
assertTrue(exception.getMessage().contains("Task ID of task to remove is null!"));
}
+
+ @Test
+ public void shouldThrowIfPauseTaskActionIsCreatedWithNullTaskId() {
+ final Exception exception = assertThrows(NullPointerException.class, () -> createPauseTask(null));
+ assertTrue(exception.getMessage().contains("Task ID of task to pause is null!"));
+ }
}
\ No newline at end of file