You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/07/19 16:44:19 UTC
[kafka] branch trunk updated: KAFKA-10199: Add RESUME in state updater (#12387)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387)
693e283802 is described below
commit 693e283802590b724ef441d5bf7acb6eeced91c5
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jul 19 09:44:10 2022 -0700
KAFKA-10199: Add RESUME in state updater (#12387)
* Need to check enforceRestoreActive / transitToUpdateStandby when resuming a paused task.
* Do not expose another getResumedTasks since I think its caller only need the getPausedTasks.
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../processor/internals/DefaultStateUpdater.java | 36 ++++-
.../streams/processor/internals/StateUpdater.java | 13 ++
.../streams/processor/internals/TaskAndAction.java | 10 +-
.../internals/DefaultStateUpdaterTest.java | 158 ++++++++++++++++++++-
.../processor/internals/TaskAndActionTest.java | 20 +++
5 files changed, 229 insertions(+), 8 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 08959bee00..7e7ec2a6f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater {
case PAUSE:
pauseTask(taskAndAction.getTaskId());
break;
+ case RESUME:
+ resumeTask(taskAndAction.getTaskId());
+ break;
}
}
} finally {
@@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater {
final Task existingTask = updatingTasks.putIfAbsent(task.id(), task);
if (existingTask != null) {
throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " +
- "should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+ "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
}
if (task.isActive()) {
@@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
+ private void resumeTask(final TaskId taskId) {
+ final Task task = pausedTasks.get(taskId);
+ if (task != null) {
+ updatingTasks.put(taskId, task);
+ pausedTasks.remove(taskId);
+
+ if (task.isActive()) {
+ log.debug("Stateful active task " + task.id() + " was resumed to the updating tasks of the state updater");
+ changelogReader.enforceRestoreActive();
+ } else {
+ log.debug("Standby task " + task.id() + " was resumed to the updating tasks of the state updater");
+ if (updatingTasks.size() == 1) {
+ changelogReader.transitToUpdateStandby();
+ }
+ }
+ } else {
+ log.debug("Task " + taskId + " was not resumed since it is not paused.");
+ }
+ }
+
private boolean isStateless(final Task task) {
return task.changelogPartitions().isEmpty() && task.isActive();
}
@@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
+ @Override
+ public void resume(final TaskId taskId) {
+ tasksAndActionsLock.lock();
+ try {
+ tasksAndActions.add(TaskAndAction.createResumeTask(taskId));
+ tasksAndActionsCondition.signalAll();
+ } finally {
+ tasksAndActionsLock.unlock();
+ }
+ }
+
@Override
public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
final long timeoutMs = timeout.toMillis();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 516e47436b..69d521b600 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -107,6 +107,19 @@ public interface StateUpdater {
*/
void pause(final TaskId taskId);
+ /**
+ * Resume restoring a task (active or standby) in the state updater.
+ *
+ * This method does not block until the task is paused.
+ *
+ * Restored tasks, removed tasks and failed tasks are not resumed so this action would be an no-op for them.
+ * Stateless tasks will never be resumed since they are immediately added to the
+ * restored active tasks.
+ *
+ * @param taskId ID of the task to remove
+ */
+ void resume(final TaskId taskId);
+
/**
* Drains the restored active tasks from the state updater.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
index 585374c339..cc93321a29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
@@ -25,7 +25,8 @@ public class TaskAndAction {
enum Action {
ADD,
REMOVE,
- PAUSE
+ PAUSE,
+ RESUME
}
private final Task task;
@@ -53,6 +54,11 @@ public class TaskAndAction {
return new TaskAndAction(null, taskId, Action.PAUSE);
}
+ public static TaskAndAction createResumeTask(final TaskId taskId) {
+ Objects.requireNonNull(taskId, "Task ID of task to resume is null!");
+ return new TaskAndAction(null, taskId, Action.RESUME);
+ }
+
public Task getTask() {
if (action != Action.ADD) {
throw new IllegalStateException("Action type " + action + " cannot have a task!");
@@ -61,7 +67,7 @@ public class TaskAndAction {
}
public TaskId getTaskId() {
- if (action != Action.REMOVE && action != Action.PAUSE) {
+ if (action != Action.REMOVE && action != Action.PAUSE && action != Action.RESUME) {
throw new IllegalStateException("Action type " + action + " cannot have a task ID!");
}
return taskId;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 14b8237fe7..a6543bd620 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -617,7 +617,7 @@ class DefaultStateUpdaterTest {
}
@Test
- public void shouldIgnorePausingNotExistTasks() throws Exception {
+ public void shouldNotPausingNonExistTasks() throws Exception {
stateUpdater.start();
stateUpdater.pause(TASK_0_0);
@@ -723,6 +723,138 @@ class DefaultStateUpdaterTest {
verifyPausedTasks();
}
+ @Test
+ public void shouldResumeActiveStatefulTask() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldResumeStatefulTask(task);
+ verify(changelogReader, times(2)).enforceRestoreActive();
+ }
+
+ @Test
+ public void shouldResumeStandbyTask() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldResumeStatefulTask(task);
+ verify(changelogReader, times(2)).transitToUpdateStandby();
+ }
+
+ private void shouldResumeStatefulTask(final Task task) throws Exception {
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ stateUpdater.pause(task.id());
+
+ verifyPausedTasks(task);
+ verifyUpdatingTasks();
+
+ stateUpdater.resume(task.id());
+
+ verifyPausedTasks();
+ verifyUpdatingTasks(task);
+ }
+
+ @Test
+ public void shouldNotResumeNonExistingTasks() throws Exception {
+ stateUpdater.start();
+ stateUpdater.resume(TASK_0_0);
+
+ verifyPausedTasks();
+ verifyRestoredActiveTasks();
+ verifyRemovedTasks();
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ stateUpdater.start();
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+
+ verifyRestoredActiveTasks(task);
+
+ stateUpdater.resume(task.id());
+ stateUpdater.resume(controlTask.id());
+
+ verifyPausedTasks();
+ verifyRestoredActiveTasks(task);
+ verifyUpdatingTasks(controlTask);
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotPauseTaskInRemovedTasks(task);
+ }
+
+ @Test
+ public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotResumeTaskInRemovedTasks(task);
+ }
+
+ private void shouldNotResumeTaskInRemovedTasks(final Task task) throws Exception {
+ when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ verifyUpdatingTasks(task);
+ verifyExceptionsAndFailedTasks();
+
+ stateUpdater.remove(task.id());
+
+ verifyRemovedTasks(task);
+ verifyUpdatingTasks();
+
+ stateUpdater.resume(task.id());
+
+ verifyUpdatingTasks();
+ }
+
+ @Test
+ public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
+ final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotPauseTaskInFailedTasks(task);
+ }
+
+ @Test
+ public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotResumeTaskInFailedTasks(task);
+ }
+
+ private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception {
+ final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StreamsException streamsException = new StreamsException("Something happened", task.id());
+ when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ final Map<TaskId, Task> updatingTasks = mkMap(
+ mkEntry(task.id(), task),
+ mkEntry(controlTask.id(), controlTask)
+ );
+ doThrow(streamsException)
+ .doNothing()
+ .when(changelogReader).restore(updatingTasks);
+ stateUpdater.start();
+
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task), streamsException);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+ verifyUpdatingTasks(controlTask);
+
+ stateUpdater.resume(task.id());
+ stateUpdater.resume(controlTask.id());
+
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+ verifyUpdatingTasks(controlTask);
+ }
+
@Test
public void shouldDrainRemovedTasks() throws Exception {
assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
@@ -1146,7 +1278,11 @@ class DefaultStateUpdaterTest {
private void verifyRestoredActiveTasks(final StreamTask... tasks) throws Exception {
if (tasks.length == 0) {
- assertTrue(stateUpdater.getRestoredActiveTasks().isEmpty());
+ waitForCondition(
+ () -> stateUpdater.getRestoredActiveTasks().isEmpty(),
+ VERIFICATION_TIMEOUT,
+ "Did not get empty restored active task within the given timeout!"
+ );
} else {
final Set<StreamTask> expectedRestoredTasks = mkSet(tasks);
final Set<StreamTask> restoredTasks = new HashSet<>();
@@ -1179,7 +1315,11 @@ class DefaultStateUpdaterTest {
private void verifyUpdatingTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
- assertTrue(stateUpdater.getUpdatingTasks().isEmpty());
+ waitForCondition(
+ () -> stateUpdater.getUpdatingTasks().isEmpty(),
+ VERIFICATION_TIMEOUT,
+ "Did not get empty updating task within the given timeout!"
+ );
} else {
final Set<Task> expectedUpdatingTasks = mkSet(tasks);
final Set<Task> updatingTasks = new HashSet<>();
@@ -1211,7 +1351,11 @@ class DefaultStateUpdaterTest {
private void verifyRemovedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
- assertTrue(stateUpdater.getRemovedTasks().isEmpty());
+ waitForCondition(
+ () -> stateUpdater.getRemovedTasks().isEmpty(),
+ VERIFICATION_TIMEOUT,
+ "Did not get empty removed task within the given timeout!"
+ );
} else {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
@@ -1229,7 +1373,11 @@ class DefaultStateUpdaterTest {
private void verifyPausedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
- assertTrue(stateUpdater.getPausedTasks().isEmpty());
+ waitForCondition(
+ () -> stateUpdater.getPausedTasks().isEmpty(),
+ VERIFICATION_TIMEOUT,
+ "Did not get empty paused task within the given timeout!"
+ );
} else {
final Set<Task> expectedPausedTasks = mkSet(tasks);
final Set<Task> pausedTasks = new HashSet<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
index f994ef75c9..2bc9d05326 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
@@ -22,9 +22,11 @@ import org.junit.jupiter.api.Test;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.PAUSE;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE;
+import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.RESUME;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.createPauseTask;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.createRemoveTask;
+import static org.apache.kafka.streams.processor.internals.TaskAndAction.createResumeTask;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -68,6 +70,18 @@ class TaskAndActionTest {
assertEquals("Action type PAUSE cannot have a task!", exception.getMessage());
}
+ @Test
+ public void shouldCreateResumeTaskAction() {
+ final TaskId taskId = new TaskId(0, 0);
+
+ final TaskAndAction pauseTask = createResumeTask(taskId);
+
+ assertEquals(RESUME, pauseTask.getAction());
+ assertEquals(taskId, pauseTask.getTaskId());
+ final Exception exception = assertThrows(IllegalStateException.class, pauseTask::getTask);
+ assertEquals("Action type RESUME cannot have a task!", exception.getMessage());
+ }
+
@Test
public void shouldThrowIfAddTaskActionIsCreatedWithNullTask() {
final Exception exception = assertThrows(NullPointerException.class, () -> createAddTask(null));
@@ -85,4 +99,10 @@ class TaskAndActionTest {
final Exception exception = assertThrows(NullPointerException.class, () -> createPauseTask(null));
assertTrue(exception.getMessage().contains("Task ID of task to pause is null!"));
}
+
+ @Test
+ public void shouldThrowIfResumeTaskActionIsCreatedWithNullTaskId() {
+ final Exception exception = assertThrows(NullPointerException.class, () -> createResumeTask(null));
+ assertTrue(exception.getMessage().contains("Task ID of task to resume is null!"));
+ }
}
\ No newline at end of file