You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/24 02:27:08 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12338: KAFKA-10199 [DO NOT REVIEW]: Cleanup TaskManager and Task interfaces

guozhangwang opened a new pull request, #12338:
URL: https://github.com/apache/kafka/pull/12338

   This PR is to be reviewed after #12337.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang closed pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

Posted by GitBox <gi...@apache.org>.
guozhangwang closed pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces
URL: https://github.com/apache/kafka/pull/12338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12338:
URL: https://github.com/apache/kafka/pull/12338#issuecomment-1179434266

   Closing this PR in favor of https://github.com/apache/kafka/pull/12397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12338:
URL: https://github.com/apache/kafka/pull/12338#discussion_r915368085


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -337,7 +369,7 @@ void addToSuccessfullyProcessed(final Task task) {
         successfullyProcessed.add(task);
     }
 
-    void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) {
+    void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {

Review Comment:
   Fixed a typo in function name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] VforJuvenile commented on pull request #12338: KAFKA-10199 [DO NOT REVIEW]: Cleanup TaskManager and Task interfaces

Posted by GitBox <gi...@apache.org>.
VforJuvenile commented on PR #12338:
URL: https://github.com/apache/kafka/pull/12338#issuecomment-1165122558

   这是来自QQ邮箱的假期自动回复邮件。
    
   您好,我最近正在休假中,无法亲自回复您的邮件。我将在假期结束后,尽快给您回复。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12338:
URL: https://github.com/apache/kafka/pull/12338#discussion_r915355254


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -83,7 +83,6 @@ class DefaultStateUpdaterTest {
     private final Time time = new MockTime(1L);
     private final StreamsConfig config = new StreamsConfig(configProps());
     private final ChangelogReader changelogReader = mock(ChangelogReader.class);
-    private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };

Review Comment:
   This is a leftover from previous commit, we do not need this anymore.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -34,29 +33,41 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
     private final Logger log;
     private final TopologyMetadata topologyMetadata;
 
-    private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
-    private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
-    private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());
-
     // TODO: change type to `StreamTask`
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
+    // TODO: change type to `StandbyTask`
+    private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
+
+    // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
+    // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
+    // we receive a new assignment and they are revoked from the thread.
+
+    // Tasks may have been assigned but not yet created because:
+    // 1. They are for a NamedTopology that is yet known by this host.
+    // 2. They are to be recycled from an existing restoring task yet to be returned from the task updater.
+    //
+    // When that occurs we stash these pending tasks until either they are finally clear to be created,
+    // or they are revoked from a new assignment.
+    private final Map<TaskId, Set<TopicPartition>> pendingActiveTasks = new HashMap<>();

Review Comment:
   This is mainly for case 1) in the description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -182,7 +159,7 @@ public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
                 partitions
             );
 
-            final InternalProcessorContext context = new ProcessorContextImpl(
+            final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(

Review Comment:
   Minor cleanup.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -285,34 +286,46 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
         final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
         final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
+        final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
         final Comparator<Task> byId = Comparator.comparing(Task::id);
-        final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
 
-        // first rectify all existing tasks
+        tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
+
+        // first rectify all existing tasks:
+        // 1. for tasks that are already owned, just resume and skip re-creating them
+        // 2. for tasks that have changed active/standby status, just recycle and skip re-creating them
+        // 3. otherwise, close them since they are no longer owned.
         for (final Task task : tasks.allTasks()) {
-            if (activeTasks.containsKey(task.id()) && task.isActive()) {
-                tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
-                activeTasksToCreate.remove(task.id());
-            } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
-                tasks.updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
-                standbyTasksToCreate.remove(task.id());
-            } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
-                // check for tasks that were owned previously but have changed active/standby status
-                tasksToRecycle.add(task);
+            final TaskId taskId = task.id();
+            if (activeTasksToCreate.containsKey(taskId)) {
+                if (task.isActive()) {
+                    tasks.updateInputPartitionsAndResume(task, activeTasksToCreate.get(taskId));
+                } else {
+                    tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
+                }
+                activeTasksToCreate.remove(taskId);
+            } else if (standbyTasksToCreate.containsKey(taskId)) {
+                if (!task.isActive()) {
+                    tasks.updateInputPartitionsAndResume(task, standbyTasksToCreate.get(taskId));
+                } else {
+                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
+                }
+                standbyTasksToCreate.remove(taskId);
             } else {
                 tasksToCloseClean.add(task);
             }
         }
 
+        tasks.addActivePendingTasks(pendingTasksToCreate(activeTasksToCreate));
+        tasks.addStandbyPendingTasks(pendingTasksToCreate(standbyTasksToCreate));
+
         // close and recycle those tasks
-        handleCloseAndRecycle(
+        closeAndRecycleTasks(
             tasksToRecycle,
             tasksToCloseClean,
             tasksToCloseDirty,
-            activeTasksToCreate,

Review Comment:
   This is a cleanup: we construct the pending tasks to recycle outside the `handleCloseAndRecycle` so that we do not need to pass in `activeTasksToCreate` / `standbyTasksToCreate` anymore.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -34,29 +33,41 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
     private final Logger log;
     private final TopologyMetadata topologyMetadata;
 
-    private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());

Review Comment:
   This is a minor cleanup: only return read only tasks upon calling.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -68,11 +67,6 @@ class ActiveTaskCreator {
     private final Map<TaskId, StreamsProducer> taskProducers;
     private final ProcessingMode processingMode;
 
-    // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
-    // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
-    // we receive a new assignment and they are revoked from the thread.
-    private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated = new HashMap<>();

Review Comment:
   This is for case 2) in description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -859,16 +892,6 @@ void closeAndCleanUpTasks(final Collection<Task> activeTasks, final Collection<T
             closeTaskDirty(task);
         }
 
-        // TODO: change type to `StreamTask`

Review Comment:
   This is for case 3) in the description: this is already in the closeTaskClean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org