You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/26 16:00:47 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

cadonna commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930069951


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final TaskId taskId,
         );
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need validations
+     */
     StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
                                            final Set<TopicPartition> inputPartitions,
                                            final Consumer<byte[], byte[]> consumer) {
+        if (!inputPartitions.equals(standbyTask.inputPartitions)) {
+            log.warn("Detected unmatched input partitions for task {} when recycling it from standby to active", standbyTask.id);
+        }
+
         final RecordCollector recordCollector = createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), standbyTask.topology);
-        final StreamTask task = standbyTask.recycle(time, cache, recordCollector, inputPartitions, consumer);
+        final StreamTask task = new StreamTask(
+            standbyTask.id,
+            inputPartitions,
+            standbyTask.topology,
+            consumer,
+            standbyTask.config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            standbyTask.stateMgr,
+            recordCollector,
+            standbyTask.processorContext,
+            standbyTask.logContext
+        );
 
-        log.trace("Created active task {} with assigned partitions {}", task.id, inputPartitions);
+        log.trace("Recycled active task {} from recycled standby with assigned partitions {}", task.id, inputPartitions);

Review Comment:
   ```suggestion
           log.trace("Create active task {} from recycled standby task with assigned partitions {}", task.id, inputPartitions);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -493,7 +531,9 @@ boolean tryToCompleteRestoration(final long now, final java.util.function.Consum
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.allTasks()) {
             try {
-                task.initializeIfNeeded();
+                if (task.initializeIfNeeded() && stateUpdater != null) {
+                    stateUpdater.add(task);
+                }

Review Comment:
   I am not sure this is the right place to add the task to the state updater. I would rather add brand new tasks in `createNewTasks()`. We can add recycled tasks once we read the tasks to recycle from the removed tasks queue of the state updater and we recycled them. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -219,7 +219,7 @@ public boolean isActive() {
      * @throws StreamsException fatal error, should close the thread
      */
     @Override
-    public void initializeIfNeeded() {
+    public boolean initializeIfNeeded() {

Review Comment:
   See my comment in `StandbyTask` regarding this return value.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java:
##########
@@ -109,11 +107,30 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre
         return createdTasks;
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need validations
+     */
     StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,

Review Comment:
   Could you please add a unit test for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -91,7 +92,7 @@ public boolean isActive() {
      * @throws StreamsException fatal error, should close the thread
      */
     @Override
-    public void initializeIfNeeded() {
+    public boolean initializeIfNeeded() {

Review Comment:
   I think you do not need to add the boolean return value here. See my other comment in `tryToCompleteRestoration()`. I would rather extract the if-branch of `if (state() == State.CREATED)` to a separate method and call that method in `TaskManager#createNewTasks()`. WDYT?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -77,6 +93,22 @@ public void registerTaskError(final Task task, final Throwable t, final long now
         }
     }
 
+    Collection<Task> successfullyProcessed() {
+        return successfullyProcessed;
+    }
+
+    void addToSuccessfullyProcessed(final Task task) {
+        successfullyProcessed.add(task);
+    }
+
+    void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {
+        successfullyProcessed.remove(task);
+    }
+
+    void clearSuccessfullyProcessed() {
+        successfullyProcessed.clear();
+    }
+

Review Comment:
   Is this better suited for `TaskExecutor`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final TaskId taskId,
         );
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need validations
+     */
     StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,

Review Comment:
   Could you please add a unit test for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -480,6 +503,21 @@ private void closeAndRecycleTasks(final Map<Task, Set<TopicPartition>> tasksToRe
         }
     }
 
+    private void convertActiveToStandby(final StreamTask activeTask,
+                                        final Set<TopicPartition> partitions) {
+        activeTask.recycleAndConvert();

Review Comment:
   Could we do this inside `createStandbyTaskFromActive()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -480,6 +503,21 @@ private void closeAndRecycleTasks(final Map<Task, Set<TopicPartition>> tasksToRe
         }
     }
 
+    private void convertActiveToStandby(final StreamTask activeTask,
+                                        final Set<TopicPartition> partitions) {
+        activeTask.recycleAndConvert();
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
+        final StandbyTask standbyTask = standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
+        tasks.replaceActiveWithStandby(standbyTask);
+    }
+
+    private void convertStandbyToActive(final StandbyTask standbyTask,
+                                        final Set<TopicPartition> partitions) {
+        standbyTask.recycleAndConvert();

Review Comment:
   See my comment above in `convertActiveToStandby()`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -544,7 +548,7 @@ public void updateInputPartitions(final Set<TopicPartition> topicPartitions, fin
     }
 
     @Override
-    public void closeCleanAndRecycleState() {
+    public void recycleAndConvert() {

Review Comment:
   This name is a bit misleading. Something like `prepareForRecycle()` or similar would be better, IMO.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -377,7 +390,17 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             throw first.getValue();
         }
 
-        tasks.createTasks(activeTasksToCreate, standbyTasksToCreate);
+        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void createNewTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
+                                final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) {
+        final Collection<Task> newActiveTasks = activeTasksToCreate.isEmpty() ?
+            Collections.emptySet() : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate);
+        final Collection<Task> newStandbyTask = standbyTasksToCreate.isEmpty() ?
+            Collections.emptySet() : standbyTaskCreator.createTasks(standbyTasksToCreate);

Review Comment:
   Why do you use this expression? Method `ActiveTaskCreator#createTasks()` would return an empty set when `activeTasksToCreate` is empty. Same is true for `StandbyTaskCreator#createTasks()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org