You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/12 13:46:59 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

cadonna commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r944439546


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -316,20 +313,22 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             classifyTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
         }
 
-        tasks.addPendingActiveTasks(pendingTasksToCreate(activeTasksToCreate));
-        tasks.addPendingStandbyTasks(pendingTasksToCreate(standbyTasksToCreate));
+        tasks.purgePendingTasksToCreate();

Review Comment:
   nit: `purgePendingTasksToCreate()` -> `clearPendingTasksToCreate()`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldHandleRemovedTasksFromStateUpdater() {
+        // tasks to recycle
+        final StreamTask task00 = mock(StreamTask.class);
+        final StandbyTask task01 = mock(StandbyTask.class);
+        final StandbyTask task00Converted = mock(StandbyTask.class);
+        final StreamTask task01Converted = mock(StreamTask.class);
+        // task to close
+        final StreamTask task02 = mock(StreamTask.class);
+        // task to update inputs
+        final StreamTask task03 = mock(StreamTask.class);
+        when(task00.id()).thenReturn(taskId00);
+        when(task01.id()).thenReturn(taskId01);
+        when(task02.id()).thenReturn(taskId02);
+        when(task03.id()).thenReturn(taskId03);
+        when(task00.inputPartitions()).thenReturn(taskId00Partitions);
+        when(task01.inputPartitions()).thenReturn(taskId01Partitions);
+        when(task02.inputPartitions()).thenReturn(taskId02Partitions);
+        when(task03.inputPartitions()).thenReturn(taskId03Partitions);
+        when(task00.isActive()).thenReturn(true);
+        when(task01.isActive()).thenReturn(false);
+        when(task02.isActive()).thenReturn(true);
+        when(task03.isActive()).thenReturn(true);
+        when(task00.state()).thenReturn(State.RESTORING);
+        when(task01.state()).thenReturn(State.RUNNING);
+        when(task02.state()).thenReturn(State.RESTORING);
+        when(task03.state()).thenReturn(State.RESTORING);

Review Comment:
   We should extend the mock task builder in `StreamsTestUtils` to allow the specification of the input partitions. This setup code would get much simpler with that. I can extends the utils once this PR is merged. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -641,9 +642,9 @@ boolean tryToCompleteRestoration(final long now, final java.util.function.Consum
                 }
             }
         } else {
-            for (final Task task : tasks.drainPendingTaskToRestore()) {
-                stateUpdater.add(task);
-            }
+            addTaskstoStateUpdater();

Review Comment:
   `addTaskstoStateUpdater()` -> ` addTasksToStateUpdater()`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldHandleRemovedTasksFromStateUpdater() {

Review Comment:
   We also need a test to verify that new tasks are initialized and added to the state updater.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -43,35 +43,48 @@
 class Tasks {
     private final Logger log;
 
+    // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
     private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
 
     // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
     // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
     // we receive a new assignment and they are revoked from the thread.
-
-    // Tasks may have been assigned but not yet created because:
-    // 1. They are for a NamedTopology that is yet known by this host.
-    // 2. They are to be recycled from an existing restoring task yet to be returned from the state updater.
-    //
-    // When that occurs we stash these pending tasks until either they are finally clear to be created,
-    // or they are revoked from a new assignment.
     private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
     private final Set<TaskId> pendingTasksToClose = new HashSet<>();
 
+    // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>();
 
     Tasks(final LogContext logContext) {
         this.log = logContext.logger(getClass());
     }
 
-    void purgePendingTasksToCreate(final Set<TaskId> assignedActiveTasks, final Set<TaskId> assignedStandbyTasks) {
-        pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
-        pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+    void purgePendingTasksToCreate() {
+        pendingActiveTasksToCreate.clear();
+        pendingStandbyTasksToCreate.clear();
+    }
+
+    Map<TaskId, Set<TopicPartition>> drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {

Review Comment:
   Could you provide unit tests for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -316,20 +313,22 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             classifyTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
         }
 
-        tasks.addPendingActiveTasks(pendingTasksToCreate(activeTasksToCreate));
-        tasks.addPendingStandbyTasks(pendingTasksToCreate(standbyTasksToCreate));
+        tasks.purgePendingTasksToCreate();
+        tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate));
+        tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(standbyTasksToCreate));

Review Comment:
   I was actually proposing to move this block before line 306.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -43,35 +43,48 @@
 class Tasks {
     private final Logger log;
 
+    // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
     private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
 
     // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
     // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
     // we receive a new assignment and they are revoked from the thread.
-
-    // Tasks may have been assigned but not yet created because:
-    // 1. They are for a NamedTopology that is yet known by this host.
-    // 2. They are to be recycled from an existing restoring task yet to be returned from the state updater.
-    //
-    // When that occurs we stash these pending tasks until either they are finally clear to be created,
-    // or they are revoked from a new assignment.
     private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
     private final Set<TaskId> pendingTasksToClose = new HashSet<>();
 
+    // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>();
 
     Tasks(final LogContext logContext) {
         this.log = logContext.logger(getClass());
     }
 
-    void purgePendingTasksToCreate(final Set<TaskId> assignedActiveTasks, final Set<TaskId> assignedStandbyTasks) {
-        pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
-        pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+    void purgePendingTasksToCreate() {
+        pendingActiveTasksToCreate.clear();
+        pendingStandbyTasksToCreate.clear();
+    }
+
+    Map<TaskId, Set<TopicPartition>> drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {
+        final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies =
+            filterMap(pendingActiveTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName()));
+
+        pendingActiveTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet());
+
+        return pendingActiveTasksForTopologies;
+    }
+
+    Map<TaskId, Set<TopicPartition>> pendingStandbyTasksForTopologies(final Set<String> currentTopologies) {

Review Comment:
   Could you provide unit tests for this method?
   
   I would also rename this method to `drainPendingStandbyTasksForTopologies` to be consistent with the previous method.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldHandleRemovedTasksFromStateUpdater() {
+        // tasks to recycle
+        final StreamTask task00 = mock(StreamTask.class);
+        final StandbyTask task01 = mock(StandbyTask.class);
+        final StandbyTask task00Converted = mock(StandbyTask.class);
+        final StreamTask task01Converted = mock(StreamTask.class);
+        // task to close
+        final StreamTask task02 = mock(StreamTask.class);
+        // task to update inputs
+        final StreamTask task03 = mock(StreamTask.class);
+        when(task00.id()).thenReturn(taskId00);
+        when(task01.id()).thenReturn(taskId01);
+        when(task02.id()).thenReturn(taskId02);
+        when(task03.id()).thenReturn(taskId03);
+        when(task00.inputPartitions()).thenReturn(taskId00Partitions);
+        when(task01.inputPartitions()).thenReturn(taskId01Partitions);
+        when(task02.inputPartitions()).thenReturn(taskId02Partitions);
+        when(task03.inputPartitions()).thenReturn(taskId03Partitions);
+        when(task00.isActive()).thenReturn(true);
+        when(task01.isActive()).thenReturn(false);
+        when(task02.isActive()).thenReturn(true);
+        when(task03.isActive()).thenReturn(true);
+        when(task00.state()).thenReturn(State.RESTORING);
+        when(task01.state()).thenReturn(State.RUNNING);
+        when(task02.state()).thenReturn(State.RESTORING);
+        when(task03.state()).thenReturn(State.RESTORING);
+        when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01, task02, task03));
+
+        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+            .andStubReturn(task01Converted);
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+        expectLastCall().times(2);
+        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+            .andStubReturn(task00Converted);
+        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+        consumer.resume(anyObject());
+        expectLastCall().anyTimes();
+        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);

Review Comment:
   Why are you mixing Mockito with EasyMock? Since this is new code you could just use Mockito.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldHandleRemovedTasksFromStateUpdater() {

Review Comment:
   The exceptional cases are missing like `newTask.initializeIfNeeded();` throws an exception or closing a tasks throws an exception.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldHandleRemovedTasksFromStateUpdater() {
+        // tasks to recycle
+        final StreamTask task00 = mock(StreamTask.class);
+        final StandbyTask task01 = mock(StandbyTask.class);
+        final StandbyTask task00Converted = mock(StandbyTask.class);
+        final StreamTask task01Converted = mock(StreamTask.class);
+        // task to close
+        final StreamTask task02 = mock(StreamTask.class);
+        // task to update inputs
+        final StreamTask task03 = mock(StreamTask.class);

Review Comment:
   I would add a test for each of these task lifecycle actions. This makes the tests easier readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org