You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/08/31 18:46:01 UTC
[kafka] branch trunk updated: KAFKA-10199: Shutdown state updater on task manager shutdown (#12569)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bc8f7d07d9 KAFKA-10199: Shutdown state updater on task manager shutdown (#12569)
bc8f7d07d9 is described below
commit bc8f7d07d9c544536dc24391bf5dbf170905733f
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Aug 31 20:45:53 2022 +0200
KAFKA-10199: Shutdown state updater on task manager shutdown (#12569)
When the task manager is shutdown, the state updater should also
shutdown. After the shutdown of the state updater, the tasks
in its output queues should be closed.
Reviewer: Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/DefaultStateUpdater.java | 1 +
.../streams/processor/internals/TaskManager.java | 70 +++++++++++++++++-
.../kafka/streams/processor/internals/Tasks.java | 8 +--
.../streams/processor/internals/TasksRegistry.java | 6 +-
.../internals/DefaultStateUpdaterTest.java | 1 -
.../processor/internals/TaskManagerTest.java | 84 +++++++++++++++++++++-
.../streams/processor/internals/TasksTest.java | 4 +-
7 files changed, 158 insertions(+), 16 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index ba1606edf3..32bb33e0f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -236,6 +236,7 @@ public class DefaultStateUpdater implements StateUpdater {
task.maybeCheckpoint(true);
removedTasks.add(task);
});
+ updatingTasks.clear();
pausedTasks.forEach((id, task) -> {
removedTasks.add(task);
});
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f241621781..8eb31aef79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -365,8 +365,8 @@ public class TaskManager {
final Collection<Task> newStandbyTask = standbyTaskCreator.createTasks(standbyTasksToCreate);
if (stateUpdater == null) {
- tasks.addNewActiveTasks(newActiveTasks);
- tasks.addNewStandbyTasks(newStandbyTask);
+ tasks.addActiveTasks(newActiveTasks);
+ tasks.addStandbyTasks(newStandbyTask);
} else {
tasks.addPendingTaskToInit(newActiveTasks);
tasks.addPendingTaskToInit(newStandbyTask);
@@ -746,7 +746,7 @@ public class TaskManager {
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
try {
task.completeRestoration(offsetResetter);
- tasks.addNewActiveTask(task);
+ tasks.addActiveTask(task);
mainConsumer.resume(task.inputPartitions());
task.clearTaskTimeout();
} catch (final TimeoutException timeoutException) {
@@ -1143,6 +1143,8 @@ public class TaskManager {
}
void shutdown(final boolean clean) {
+ shutdownStateUpdater();
+
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// TODO: change type to `StreamTask`
@@ -1180,6 +1182,68 @@ public class TaskManager {
}
}
+ private void shutdownStateUpdater() {
+ if (stateUpdater != null) {
+ stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ closeFailedTasks();
+ addRestoredTasksToTaskRegistry();
+ addRemovedTasksToTaskRegistry();
+ }
+ }
+
+ private void closeFailedTasks() {
+ final Set<Task> tasksToCloseDirty = stateUpdater.drainExceptionsAndFailedTasks().stream()
+ .flatMap(exAndTasks -> exAndTasks.getTasks().stream()).collect(Collectors.toSet());
+
+ for (final Task task : tasksToCloseDirty) {
+ try {
+ // we call this function only to flush the case if necessary
+ // before suspending and closing the topology
+ task.prepareCommit();
+ } catch (final RuntimeException swallow) {
+ log.error("Error flushing caches of dirty task {} ", task.id(), swallow);
+ }
+
+ try {
+ task.suspend();
+ } catch (final RuntimeException swallow) {
+ log.error("Error suspending dirty task {}: {}", task.id(), swallow.getMessage());
+ }
+
+ task.closeDirty();
+
+ try {
+ if (task.isActive()) {
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
+ }
+ } catch (final RuntimeException swallow) {
+ log.error("Error closing dirty task {}: {}", task.id(), swallow.getMessage());
+ }
+ }
+ }
+
+ private void addRestoredTasksToTaskRegistry() {
+ tasks.addActiveTasks(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).stream()
+ .map(t -> (Task) t)
+ .collect(Collectors.toSet())
+ );
+ }
+
+ private void addRemovedTasksToTaskRegistry() {
+ final Set<Task> removedTasks = stateUpdater.drainRemovedTasks();
+ final Set<Task> removedActiveTasks = new HashSet<>();
+ final Iterator<Task> iterator = removedTasks.iterator();
+ while (iterator.hasNext()) {
+ final Task task = iterator.next();
+ if (task.isActive()) {
+ iterator.remove();
+ removedActiveTasks.add(task);
+ }
+ }
+ tasks.addActiveTasks(removedActiveTasks);
+ tasks.addStandbyTasks(removedTasks);
+ }
+
/**
* Closes and cleans up after the provided tasks, including closing their corresponding task producers
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 361b33f6b7..fe20e89069 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -154,16 +154,16 @@ class Tasks implements TasksRegistry {
}
@Override
- public void addNewActiveTasks(final Collection<Task> newTasks) {
+ public void addActiveTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task activeTask : newTasks) {
- addNewActiveTask(activeTask);
+ addActiveTask(activeTask);
}
}
}
@Override
- public void addNewActiveTask(final Task task) {
+ public void addActiveTask(final Task task) {
final TaskId taskId = task.id();
if (activeTasksPerId.containsKey(taskId)) {
@@ -182,7 +182,7 @@ class Tasks implements TasksRegistry {
}
@Override
- public void addNewStandbyTasks(final Collection<Task> newTasks) {
+ public void addStandbyTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task standbyTask : newTasks) {
final TaskId taskId = standbyTask.id();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index bba358e46b..909f5613ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -55,11 +55,11 @@ public interface TasksRegistry {
void addPendingTaskToInit(final Collection<Task> tasks);
- void addNewActiveTasks(final Collection<Task> newTasks);
+ void addActiveTasks(final Collection<Task> tasks);
- void addNewActiveTask(final Task task);
+ void addActiveTask(final Task task);
- void addNewStandbyTasks(final Collection<Task> newTasks);
+ void addStandbyTasks(final Collection<Task> tasks);
void removeTask(final Task taskToRemove);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 91485e5d3a..258d193f8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -128,7 +128,6 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
- stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 22222a0ead..e1cc2c48a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
+import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTasks;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -596,7 +597,7 @@ public class TaskManagerTest {
Mockito.verify(task).completeRestoration(noOpResetter);
Mockito.verify(task).clearTaskTimeout();
- Mockito.verify(tasks).addNewActiveTask(task);
+ Mockito.verify(tasks).addActiveTask(task);
verify(consumer);
}
@@ -614,7 +615,7 @@ public class TaskManagerTest {
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter);
Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
- Mockito.verify(tasks, never()).addNewActiveTask(task);
+ Mockito.verify(tasks, never()).addActiveTask(task);
Mockito.verify(task, never()).clearTaskTimeout();
verify(consumer);
}
@@ -872,7 +873,7 @@ public class TaskManagerTest {
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter);
- Mockito.verify(tasks).addNewActiveTask(taskToTransitToRunning);
+ Mockito.verify(tasks).addActiveTask(taskToTransitToRunning);
Mockito.verify(stateUpdater).add(recycledStandbyTask);
Mockito.verify(stateUpdater).add(recycledStandbyTask);
Mockito.verify(taskToCloseClean).closeClean();
@@ -2699,6 +2700,83 @@ public class TaskManagerTest {
verify(activeTaskCreator);
}
+ @Test
+ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() {
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final StreamTask failedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask failedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+ .inState(State.RUNNING).build();
+ when(stateUpdater.drainExceptionsAndFailedTasks())
+ .thenReturn(Arrays.asList(
+ new ExceptionAndTasks(mkSet(failedStatefulTask), new RuntimeException()),
+ new ExceptionAndTasks(mkSet(failedStandbyTask), new RuntimeException()))
+ );
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
+ activeTaskCreator.closeThreadProducerIfNeeded();
+ final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ replay(activeTaskCreator);
+
+ taskManager.shutdown(true);
+
+ verify(activeTaskCreator);
+ Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ Mockito.verify(failedStatefulTask).prepareCommit();
+ Mockito.verify(failedStatefulTask).suspend();
+ Mockito.verify(failedStatefulTask).closeDirty();
+ }
+
+ @Test
+ public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() {
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final Set<StreamTask> restoredActiveTasks = mkSet(statefulTask1, statefulTask2);
+ final Set<Task> restoredTasks = restoredActiveTasks.stream().map(t -> (Task) t).collect(Collectors.toSet());
+ when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks);
+ when(tasks.activeTasks()).thenReturn(restoredTasks);
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
+ activeTaskCreator.closeThreadProducerIfNeeded();
+ final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ replay(activeTaskCreator);
+
+ taskManager.shutdown(true);
+
+ verify(activeTaskCreator);
+ Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ Mockito.verify(tasks).addActiveTasks(restoredTasks);
+ Mockito.verify(statefulTask1).closeClean();
+ Mockito.verify(statefulTask2).closeClean();
+ }
+
+ @Test
+ public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final StreamTask removedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask removedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+ .inState(State.RUNNING).build();
+ when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask, removedStatefulTask));
+ when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask));
+ when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask, removedStandbyTask));
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
+ activeTaskCreator.closeThreadProducerIfNeeded();
+ final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ replay(activeTaskCreator);
+
+ taskManager.shutdown(true);
+
+ verify(activeTaskCreator);
+ Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ Mockito.verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
+ Mockito.verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
+ Mockito.verify(removedStatefulTask).closeClean();
+ Mockito.verify(removedStandbyTask).closeClean();
+ }
+
@Test
public void shouldInitializeNewActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index e51d4ea0f1..be1f5c4972 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -52,8 +52,8 @@ public class TasksTest {
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build();
final StreamTask statelessTask = statelessTask(TASK_1_0).build();
- tasks.addNewActiveTasks(mkSet(statefulTask, statelessTask));
- tasks.addNewStandbyTasks(Collections.singletonList(standbyTask));
+ tasks.addActiveTasks(mkSet(statefulTask, statelessTask));
+ tasks.addStandbyTasks(Collections.singletonList(standbyTask));
assertEquals(statefulTask, tasks.task(statefulTask.id()));
assertEquals(statelessTask, tasks.task(statelessTask.id()));