You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/07 01:16:36 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

guozhangwang commented on code in PR #12338:
URL: https://github.com/apache/kafka/pull/12338#discussion_r915355254


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -83,7 +83,6 @@ class DefaultStateUpdaterTest {
     private final Time time = new MockTime(1L);
     private final StreamsConfig config = new StreamsConfig(configProps());
     private final ChangelogReader changelogReader = mock(ChangelogReader.class);
-    private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };

Review Comment:
   This is a leftover from previous commit, we do not need this anymore.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -34,29 +33,41 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
     private final Logger log;
     private final TopologyMetadata topologyMetadata;
 
-    private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
-    private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
-    private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());
-
     // TODO: change type to `StreamTask`
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
+    // TODO: change type to `StandbyTask`
+    private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
+
+    // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
+    // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
+    // we receive a new assignment and they are revoked from the thread.
+
+    // Tasks may have been assigned but not yet created because:
+    // 1. They are for a NamedTopology that is yet known by this host.
+    // 2. They are to be recycled from an existing restoring task yet to be returned from the task updater.
+    //
+    // When that occurs we stash these pending tasks until either they are finally clear to be created,
+    // or they are revoked from a new assignment.
+    private final Map<TaskId, Set<TopicPartition>> pendingActiveTasks = new HashMap<>();

Review Comment:
   This is mainly for case 1) in the description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -182,7 +159,7 @@ public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
                 partitions
             );
 
-            final InternalProcessorContext context = new ProcessorContextImpl(
+            final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(

Review Comment:
   Minor cleanup.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -285,34 +286,46 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
         final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
         final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
         final Comparator<Task> byId = Comparator.comparing(Task::id);
-        final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
 
-        // first rectify all existing tasks
+        tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
+
+        // first rectify all existing tasks:
+        // 1. for tasks that are already owned, just resume and skip re-creating them
+        // 2. for tasks that have changed active/standby status, just recycle and skip re-creating them
+        // 3. otherwise, close them since they are no longer owned.
         for (final Task task : tasks.allTasks()) {
-            if (activeTasks.containsKey(task.id()) && task.isActive()) {
-                tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                activeTasksToCreate.remove(task.id());
-            } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
-                tasks.updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
-                standbyTasksToCreate.remove(task.id());
-            } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
-                // check for tasks that were owned previously but have changed active/standby status
-                tasksToRecycle.add(task);
+            final TaskId taskId = task.id();
+            if (activeTasksToCreate.containsKey(taskId)) {
+                if (task.isActive()) {
+                    tasks.updateInputPartitionsAndResume(task, activeTasksToCreate.get(taskId));
+                } else {
+                    tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
+                }
+                activeTasksToCreate.remove(taskId);
+            } else if (standbyTasksToCreate.containsKey(taskId)) {
+                if (!task.isActive()) {
+                    tasks.updateInputPartitionsAndResume(task, standbyTasksToCreate.get(taskId));
+                } else {
+                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
+                }
+                standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
 
+        tasks.addActivePendingTasks(pendingTasksToCreate(activeTasksToCreate));
+        tasks.addStandbyPendingTasks(pendingTasksToCreate(standbyTasksToCreate));
+
         // close and recycle those tasks
-        handleCloseAndRecycle(
+        closeAndRecycleTasks(
             tasksToRecycle,
             tasksToCloseClean,
             tasksToCloseDirty,
-            activeTasksToCreate,

Review Comment:
   This is a cleanup: we construct the pending tasks to recycle outside the `handleCloseAndRecycle` so that we do not need to pass in `activeTasksToCreate` / `standbyTasksToCreate` anymore.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -34,29 +33,41 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
     private final Logger log;
     private final TopologyMetadata topologyMetadata;
 
-    private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());

Review Comment:
   This is a minor cleanup: only return read only tasks upon calling.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -68,11 +67,6 @@ class ActiveTaskCreator {
     private final Map<TaskId, StreamsProducer> taskProducers;
     private final ProcessingMode processingMode;
 
-    // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
-    // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
-    // we receive a new assignment and they are revoked from the thread.
-    private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated = new HashMap<>();

Review Comment:
   This is for case 2) in description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -859,16 +892,6 @@ void closeAndCleanUpTasks(final Collection<Task> activeTasks, final Collection<T
             closeTaskDirty(task);
         }
 
-        // TODO: change type to `StreamTask`

Review Comment:
   This is for case 3) in the description: this is already in the closeTaskClean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org