You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/08/31 18:46:01 UTC

[kafka] branch trunk updated: KAFKA-10199: Shutdown state updater on task manager shutdown (#12569)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc8f7d07d9 KAFKA-10199: Shutdown state updater on task manager shutdown (#12569)
bc8f7d07d9 is described below

commit bc8f7d07d9c544536dc24391bf5dbf170905733f
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Aug 31 20:45:53 2022 +0200

    KAFKA-10199: Shutdown state updater on task manager shutdown (#12569)
    
    When the task manager is shutdown, the state updater should also
    shutdown. After the shutdown of the state updater, the tasks
    in its output queues should be closed.
    
    Reviewer: Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/DefaultStateUpdater.java   |  1 +
 .../streams/processor/internals/TaskManager.java   | 70 +++++++++++++++++-
 .../kafka/streams/processor/internals/Tasks.java   |  8 +--
 .../streams/processor/internals/TasksRegistry.java |  6 +-
 .../internals/DefaultStateUpdaterTest.java         |  1 -
 .../processor/internals/TaskManagerTest.java       | 84 +++++++++++++++++++++-
 .../streams/processor/internals/TasksTest.java     |  4 +-
 7 files changed, 158 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index ba1606edf3..32bb33e0f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -236,6 +236,7 @@ public class DefaultStateUpdater implements StateUpdater {
                 task.maybeCheckpoint(true);
                 removedTasks.add(task);
             });
+            updatingTasks.clear();
             pausedTasks.forEach((id, task) -> {
                 removedTasks.add(task);
             });
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f241621781..8eb31aef79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -365,8 +365,8 @@ public class TaskManager {
         final Collection<Task> newStandbyTask = standbyTaskCreator.createTasks(standbyTasksToCreate);
 
         if (stateUpdater == null) {
-            tasks.addNewActiveTasks(newActiveTasks);
-            tasks.addNewStandbyTasks(newStandbyTask);
+            tasks.addActiveTasks(newActiveTasks);
+            tasks.addStandbyTasks(newStandbyTask);
         } else {
             tasks.addPendingTaskToInit(newActiveTasks);
             tasks.addPendingTaskToInit(newStandbyTask);
@@ -746,7 +746,7 @@ public class TaskManager {
                                               final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         try {
             task.completeRestoration(offsetResetter);
-            tasks.addNewActiveTask(task);
+            tasks.addActiveTask(task);
             mainConsumer.resume(task.inputPartitions());
             task.clearTaskTimeout();
         } catch (final TimeoutException timeoutException) {
@@ -1143,6 +1143,8 @@ public class TaskManager {
     }
 
     void shutdown(final boolean clean) {
+        shutdownStateUpdater();
+
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
         // TODO: change type to `StreamTask`
@@ -1180,6 +1182,68 @@ public class TaskManager {
         }
     }
 
+    private void shutdownStateUpdater() {
+        if (stateUpdater != null) {
+            stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
+            closeFailedTasks();
+            addRestoredTasksToTaskRegistry();
+            addRemovedTasksToTaskRegistry();
+        }
+    }
+
+    private void closeFailedTasks() {
+        final Set<Task> tasksToCloseDirty = stateUpdater.drainExceptionsAndFailedTasks().stream()
+            .flatMap(exAndTasks -> exAndTasks.getTasks().stream()).collect(Collectors.toSet());
+
+        for (final Task task : tasksToCloseDirty) {
+            try {
+                // we call this function only to flush the case if necessary
+                // before suspending and closing the topology
+                task.prepareCommit();
+            } catch (final RuntimeException swallow) {
+                log.error("Error flushing caches of dirty task {} ", task.id(), swallow);
+            }
+
+            try {
+                task.suspend();
+            } catch (final RuntimeException swallow) {
+                log.error("Error suspending dirty task {}: {}", task.id(), swallow.getMessage());
+            }
+
+            task.closeDirty();
+
+            try {
+                if (task.isActive()) {
+                    activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
+                }
+            } catch (final RuntimeException swallow) {
+                log.error("Error closing dirty task {}: {}", task.id(), swallow.getMessage());
+            }
+        }
+    }
+
+    private void addRestoredTasksToTaskRegistry() {
+        tasks.addActiveTasks(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).stream()
+            .map(t -> (Task) t)
+            .collect(Collectors.toSet())
+        );
+    }
+
+    private void addRemovedTasksToTaskRegistry() {
+        final Set<Task> removedTasks = stateUpdater.drainRemovedTasks();
+        final Set<Task> removedActiveTasks = new HashSet<>();
+        final Iterator<Task> iterator = removedTasks.iterator();
+        while (iterator.hasNext()) {
+            final Task task = iterator.next();
+            if (task.isActive()) {
+                iterator.remove();
+                removedActiveTasks.add(task);
+            }
+        }
+        tasks.addActiveTasks(removedActiveTasks);
+        tasks.addStandbyTasks(removedTasks);
+    }
+
     /**
      * Closes and cleans up after the provided tasks, including closing their corresponding task producers
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 361b33f6b7..fe20e89069 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -154,16 +154,16 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public void addNewActiveTasks(final Collection<Task> newTasks) {
+    public void addActiveTasks(final Collection<Task> newTasks) {
         if (!newTasks.isEmpty()) {
             for (final Task activeTask : newTasks) {
-                addNewActiveTask(activeTask);
+                addActiveTask(activeTask);
             }
         }
     }
 
     @Override
-    public void addNewActiveTask(final Task task) {
+    public void addActiveTask(final Task task) {
         final TaskId taskId = task.id();
 
         if (activeTasksPerId.containsKey(taskId)) {
@@ -182,7 +182,7 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public void addNewStandbyTasks(final Collection<Task> newTasks) {
+    public void addStandbyTasks(final Collection<Task> newTasks) {
         if (!newTasks.isEmpty()) {
             for (final Task standbyTask : newTasks) {
                 final TaskId taskId = standbyTask.id();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index bba358e46b..909f5613ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -55,11 +55,11 @@ public interface TasksRegistry {
 
     void addPendingTaskToInit(final Collection<Task> tasks);
 
-    void addNewActiveTasks(final Collection<Task> newTasks);
+    void addActiveTasks(final Collection<Task> tasks);
 
-    void addNewActiveTask(final Task task);
+    void addActiveTask(final Task task);
 
-    void addNewStandbyTasks(final Collection<Task> newTasks);
+    void addStandbyTasks(final Collection<Task> tasks);
 
     void removeTask(final Task taskToRemove);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 91485e5d3a..258d193f8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -128,7 +128,6 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
-        stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
         final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
         final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 22222a0ead..e1cc2c48a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
+import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTasks;
 import org.apache.kafka.streams.processor.internals.Task.State;
 import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -596,7 +597,7 @@ public class TaskManagerTest {
 
         Mockito.verify(task).completeRestoration(noOpResetter);
         Mockito.verify(task).clearTaskTimeout();
-        Mockito.verify(tasks).addNewActiveTask(task);
+        Mockito.verify(tasks).addActiveTask(task);
         verify(consumer);
     }
 
@@ -614,7 +615,7 @@ public class TaskManagerTest {
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
-        Mockito.verify(tasks, never()).addNewActiveTask(task);
+        Mockito.verify(tasks, never()).addActiveTask(task);
         Mockito.verify(task, never()).clearTaskTimeout();
         verify(consumer);
     }
@@ -872,7 +873,7 @@ public class TaskManagerTest {
 
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(tasks).addNewActiveTask(taskToTransitToRunning);
+        Mockito.verify(tasks).addActiveTask(taskToTransitToRunning);
         Mockito.verify(stateUpdater).add(recycledStandbyTask);
         Mockito.verify(stateUpdater).add(recycledStandbyTask);
         Mockito.verify(taskToCloseClean).closeClean();
@@ -2699,6 +2700,83 @@ public class TaskManagerTest {
         verify(activeTaskCreator);
     }
 
+    @Test
+    public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() {
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final StreamTask failedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final StandbyTask failedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+            .inState(State.RUNNING).build();
+        when(stateUpdater.drainExceptionsAndFailedTasks())
+            .thenReturn(Arrays.asList(
+                new ExceptionAndTasks(mkSet(failedStatefulTask), new RuntimeException()),
+                new ExceptionAndTasks(mkSet(failedStandbyTask), new RuntimeException()))
+            );
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
+        activeTaskCreator.closeThreadProducerIfNeeded();
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        replay(activeTaskCreator);
+
+        taskManager.shutdown(true);
+
+        verify(activeTaskCreator);
+        Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        Mockito.verify(failedStatefulTask).prepareCommit();
+        Mockito.verify(failedStatefulTask).suspend();
+        Mockito.verify(failedStatefulTask).closeDirty();
+    }
+
+    @Test
+    public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() {
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final Set<StreamTask> restoredActiveTasks = mkSet(statefulTask1, statefulTask2);
+        final Set<Task> restoredTasks = restoredActiveTasks.stream().map(t -> (Task) t).collect(Collectors.toSet());
+        when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks);
+        when(tasks.activeTasks()).thenReturn(restoredTasks);
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
+        activeTaskCreator.closeThreadProducerIfNeeded();
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        replay(activeTaskCreator);
+
+        taskManager.shutdown(true);
+
+        verify(activeTaskCreator);
+        Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        Mockito.verify(tasks).addActiveTasks(restoredTasks);
+        Mockito.verify(statefulTask1).closeClean();
+        Mockito.verify(statefulTask2).closeClean();
+    }
+
+    @Test
+    public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final StreamTask removedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final StandbyTask removedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+            .inState(State.RUNNING).build();
+        when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask, removedStatefulTask));
+        when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask));
+        when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask, removedStandbyTask));
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
+        activeTaskCreator.closeThreadProducerIfNeeded();
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        replay(activeTaskCreator);
+
+        taskManager.shutdown(true);
+
+        verify(activeTaskCreator);
+        Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        Mockito.verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
+        Mockito.verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
+        Mockito.verify(removedStatefulTask).closeClean();
+        Mockito.verify(removedStandbyTask).closeClean();
+    }
+
     @Test
     public void shouldInitializeNewActiveTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index e51d4ea0f1..be1f5c4972 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -52,8 +52,8 @@ public class TasksTest {
         final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build();
         final StreamTask statelessTask = statelessTask(TASK_1_0).build();
 
-        tasks.addNewActiveTasks(mkSet(statefulTask, statelessTask));
-        tasks.addNewStandbyTasks(Collections.singletonList(standbyTask));
+        tasks.addActiveTasks(mkSet(statefulTask, statelessTask));
+        tasks.addStandbyTasks(Collections.singletonList(standbyTask));
 
         assertEquals(statefulTask, tasks.task(statefulTask.id()));
         assertEquals(statelessTask, tasks.task(statelessTask.id()));