You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/09 17:26:37 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12270: KAFKA-10199: Implement removing active and standby tasks from the state updater

guozhangwang commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r893752714


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
-                log.debug("Stateless active task " + task.id() + " was added to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
+                log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
             } else {
+                updatingTasks.put(task.id(), task);
                 if (task.isActive()) {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Stateful active task " + task.id() + " was added to the state updater");
+                    log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater");
                     changelogReader.enforceRestoreActive();
                 } else {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Standby task " + task.id() + " was added to the state updater");
+                    log.debug("Standby task " + task.id() + " was added to the updating tasks of the state updater");
                     if (updatingTasks.size() == 1) {
                         changelogReader.transitToUpdateStandby();
                     }
                 }
             }
         }
 
+        private void removeTask(final TaskId taskId) {
+            final Task task = updatingTasks.remove(taskId);

Review Comment:
   Not a comment: one of the bug fix we want to piggy-back in restoration is to write the new checkpoint when we stop restoring a task otherwise the restoration progress so-far would be lost. Also the restore callback should be triggered as well (KAFKA-10575).
   
   I will try to add this in a separate PR, also a good exercise on how the state updater could interact with the processor state manager.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final Duration timeout) {
     }
 
     @Override
-    public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
+    public Set<Task> drainRemovedTasks() {

Review Comment:
   When we add the logic for recycling a task, which is to be done at the task manager still we would need two round-trips: first remove the task as active/standby, then after recycling it add the new task as standby/active. I'm still trying to flesh out the details here, just in case we would need to also have a timeout for removed tasks similar to restored-active-tasks, this data structure may also need to be transformed to a lock+condition+queue manner instead of a blocking-queue.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final Duration timeout) {
     }
 
     @Override
-    public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
+    public Set<Task> drainRemovedTasks() {
+        final List<Task> result = new ArrayList<>();
+        removedTasks.drainTo(result);
+        return new HashSet<>(result);
+    }
+
+    @Override
+    public List<ExceptionAndTasks> drainExceptionsAndFailedTasks() {
         final List<ExceptionAndTasks> result = new ArrayList<>();
-        failedTasks.drainTo(result);
+        exceptionsAndFailedTasks.drainTo(result);
         return result;
     }
 
-    @Override
-    public Set<Task> getAllTasks() {
-        tasksAndActionsLock.lock();
+    public Set<StandbyTask> getUpdatingStandbyTasks() {
+        return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks()));
+    }
+
+    public Set<Task> getUpdatingTasks() {
+        return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingTasks()));
+    }
+
+    public Set<StreamTask> getRestoredActiveTasks() {
         restoredActiveTasksLock.lock();
         try {
-            final Set<Task> allTasks = new HashSet<>();
-            allTasks.addAll(tasksAndActions.stream()
-                .filter(t -> t.action == Action.ADD)
-                .map(t -> t.task)
-                .collect(Collectors.toList())
-            );
-            allTasks.addAll(stateUpdaterThread.getAllUpdatingTasks());
-            allTasks.addAll(restoredActiveTasks);
-            return Collections.unmodifiableSet(allTasks);
+            return Collections.unmodifiableSet(new HashSet<>(restoredActiveTasks));
         } finally {
             restoredActiveTasksLock.unlock();
-            tasksAndActionsLock.unlock();
         }
     }
 
-    @Override
-    public Set<StandbyTask> getStandbyTasks() {
-        tasksAndActionsLock.lock();
-        try {
-            final Set<StandbyTask> standbyTasks = new HashSet<>();
-            standbyTasks.addAll(tasksAndActions.stream()
-                .filter(t -> t.action == Action.ADD)
-                .filter(t -> !t.task.isActive())
-                .map(t -> (StandbyTask) t.task)
-                .collect(Collectors.toList())
-            );
-            standbyTasks.addAll(getUpdatingStandbyTasks());
-            return Collections.unmodifiableSet(standbyTasks);
-        } finally {
-            tasksAndActionsLock.unlock();
-        }
+    public List<ExceptionAndTasks> getExceptionsAndFailedTasks() {

Review Comment:
   I feel some of the getters here would not end up needed in the non-testing code since we have the corresponding `drainXX` functions that are declared in the interface and would be used in those non-testing code. If that turns out true let's cluster them together and commented they are for testing only.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -16,65 +16,101 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 public interface StateUpdater {
 
     class ExceptionAndTasks {
-        public final Set<Task> tasks;
-        public final RuntimeException exception;
+        private final Set<Task> tasks;
+        private final RuntimeException exception;
 
         public ExceptionAndTasks(final Set<Task> tasks, final RuntimeException exception) {
-            this.tasks = tasks;
-            this.exception = exception;
+            this.tasks = Objects.requireNonNull(tasks);
+            this.exception = Objects.requireNonNull(exception);
+        }
+
+        public Set<Task> tasks() {
+            return Collections.unmodifiableSet(tasks);
+        }
+
+        public RuntimeException exception() {
+            return exception;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (!(o instanceof ExceptionAndTasks)) return false;
+            final ExceptionAndTasks that = (ExceptionAndTasks) o;
+            return tasks.equals(that.tasks) && exception.equals(that.exception);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tasks, exception);
         }
     }
 
     /**
      * Adds a task (active or standby) to the state updater.
      *
+     * This method does not block until the task is added to the state updater.
+     *
      * @param task task to add
      */
     void add(final Task task);
 
     /**
-     * Removes a task (active or standby) from the state updater.
+     * Removes a task (active or standby) from the state updater and adds the removed task to the removed tasks.
+     *
+     * This method does not block until the removed task is removed from the state updater.
      *
-     * @param task task ro remove
+     * The task to be removed is not removed from the restored active tasks and the failed tasks.
+     * Stateless tasks will never be added to the removed tasks since they are immediately added to the
+     * restored active tasks.
+     *
+     * @param taskId ID of the task to remove
      */
-    void remove(final Task task);
+    void remove(final TaskId taskId);
 
     /**
-     * Gets restored active tasks from state restoration/update
+     * Drains the restored active tasks from the state updater.
+     *
+     * The returned active tasks are removed from the state updater.
      *
      * @param timeout duration how long the calling thread should wait for restored active tasks
      *
      * @return set of active tasks with up-to-date states
      */
-    Set<StreamTask> getRestoredActiveTasks(final Duration timeout);
+    Set<StreamTask> drainRestoredActiveTasks(final Duration timeout);
 
-    /**
-     * Gets failed tasks and the corresponding exceptions
-     *
-     * @return list of failed tasks and the corresponding exceptions
-     */
-    List<ExceptionAndTasks> getFailedTasksAndExceptions();
 
     /**
-     * Get all tasks (active and standby) that are managed by the state updater.
+     * Drains the removed tasks (active and standbys) from the state updater.
+     *
+     * Removed tasks returned by this method are tasks extraordinarily removed from the state updater. These do not

Review Comment:
   Like the word "extraordinarily" :)
   
   Jokes aside, I have a slight different thought about the semantics here, i.e. whether the drained removed and the restored/failed tasks should be exclusive or be overlapping possibly, mainly from how easily the caller could handle the overlapping scenarios. I left an early comment above.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -293,47 +321,165 @@ public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
 
         stateUpdater.add(task3);
 
-        verifyRestoredActiveTasks(task3);
+        verifyRestoredActiveTasks(task1, task3);
         verify(task3).completeRestoration(offsetResetter);
         orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
         orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
     }
 
+    @Test
+    public void shouldRemoveActiveStatefulTask() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldRemoveStatefulTask(task);
+    }
+
+    @Test
+    public void shouldRemoveStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldRemoveStatefulTask(task);
+    }
+
+    private void shouldRemoveStatefulTask(final Task task) throws Exception {
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(task);
+
+        stateUpdater.remove(TASK_0_0);
+
+        verifyRemovedTasks(task);
+        verifyRestoredActiveTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+        verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+    }
+
+    @Test
+    public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+    }
+
+    @Test
+    public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
+        final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+    }
+
+    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.remove(task.id());
+        stateUpdater.remove(controlTask.id());
+
+        verifyRemovedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
+        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromFailedTasks(task);
+    }
+
+    private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
+        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new StreamsException("Something happened", task.id());
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        doNothing()
+            .doThrow(streamsException)
+            .doNothing()
+            .when(changelogReader).restore(anyMap());
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task), streamsException);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+        stateUpdater.remove(task.id());
+        stateUpdater.remove(controlTask.id());
+
+        verifyRemovedTasks(controlTask);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        verifyUpdatingTasks();
+        verifyRestoredActiveTasks();
+    }
+
+    @Test
+    public void shouldDrainRemovedTasks() throws Exception {
+        assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+
+        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
+        stateUpdater.add(task1);
+        stateUpdater.remove(task1.id());
+
+        verifyDrainingRemovedTasks(task1);
+
+        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
+        stateUpdater.add(task2);
+        stateUpdater.remove(task2.id());
+        stateUpdater.add(task3);
+        stateUpdater.remove(task3.id());
+        stateUpdater.add(task4);
+        stateUpdater.remove(task4.id());
+
+        verifyDrainingRemovedTasks(task2, task3, task4);
+    }
+
     @Test
     public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
         final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
         final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
-        final String expectedMessage = "The Streams were crossed!";
-        final StreamsException expectedStreamsException = new StreamsException(expectedMessage);
+        final String exceptionMessage = "The Streams were crossed!";
+        final StreamsException streamsException = new StreamsException(exceptionMessage);
         final Map<TaskId, Task> updatingTasks = mkMap(
             mkEntry(task1.id(), task1),
             mkEntry(task2.id(), task2)
         );
-        doNothing().doThrow(expectedStreamsException).doNothing().when(changelogReader).restore(updatingTasks);
+        doNothing().doThrow(streamsException).doNothing().when(changelogReader).restore(updatingTasks);
 
         stateUpdater.add(task1);
         stateUpdater.add(task2);
 
-        final List<ExceptionAndTasks> failedTasks = getFailedTasks(1);
-        assertEquals(1, failedTasks.size());
-        final ExceptionAndTasks actualFailedTasks = failedTasks.get(0);
-        assertEquals(2, actualFailedTasks.tasks.size());
-        assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1, task2)));
-        assertTrue(actualFailedTasks.exception instanceof StreamsException);
-        final StreamsException actualException = (StreamsException) actualFailedTasks.exception;
-        assertFalse(actualException.taskId().isPresent());
-        assertEquals(expectedMessage, actualException.getMessage());
-        assertTrue(stateUpdater.getAllTasks().isEmpty());

Review Comment:
   Yup, agreed.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
-                log.debug("Stateless active task " + task.id() + " was added to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
+                log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
             } else {
+                updatingTasks.put(task.id(), task);
                 if (task.isActive()) {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Stateful active task " + task.id() + " was added to the state updater");
+                    log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater");
                     changelogReader.enforceRestoreActive();
                 } else {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Standby task " + task.id() + " was added to the state updater");
+                    log.debug("Standby task " + task.id() + " was added to the updating tasks of the state updater");
                     if (updatingTasks.size() == 1) {
                         changelogReader.transitToUpdateStandby();
                     }
                 }
             }
         }
 
+        private void removeTask(final TaskId taskId) {
+            final Task task = updatingTasks.remove(taskId);
+            if (task != null) {
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not updating.");

Review Comment:
   This is a meta comment: for those tasks that have been restored, or failed, should we still include them into the removed tasks to be returned in the `drain` function still?
   
   The reason I'm wondering about it is that, the caller of `updater.remove` would likely expect to eventually see the task show up from the future `drain` functions (again here one example would be the recycle scenario). If we do not add them there then the caller's logic needs to be a bit complicated as to check the `restored` / `failed` set from the updater as well while checking when this task has been removed completely.
   
   The down side of course is that a task can be shown in multiple of such channels, but I feel the caller's logic to "de-dup" such cases would be easier as long as there's a deterministic ordering of checking removed/completed/failed tasks from the updater.
   
   WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org