You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/24 12:21:10 UTC

[GitHub] [kafka] cadonna opened a new pull request, #12554: KAFKA-10199: Handle restored tasks output by state updater

cadonna opened a new pull request, #12554:
URL: https://github.com/apache/kafka/pull/12554

   Once the state updater restored an active task it put it
   into an output queue. The stream thread reads the restored
   active task from the the output queue and after it verified
   that the task is still owned by the stream thread it transits
   it to RUNNING.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12554:
URL: https://github.com/apache/kafka/pull/12554#issuecomment-1230537527

   Approved, thanks @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12554:
URL: https://github.com/apache/kafka/pull/12554#issuecomment-1230542970

   Failures are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954203463


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         return offsetSum;
     }
 
-    private void closeTaskDirty(final Task task) {
+    private void closeTaskDirty(final Task task, final boolean removeFromTasksRegistry) {

Review Comment:
   I have been also thinking about the taskRegistry / tasks here regarding when to remove from it. Since in the new model, this class only bookkeep running active tasks, while standby tasks and restoring active tasks are in the state updater, and the task-manager would need to get a union of them to return all managed tasks. So when we close a task:
   
   1) If the task is a running active task, then we need to remove it from the `tasks` also.
   2) If the task is retrieved from the state updater, then we do not need to remove it from the `tasks`.
   
   That means, we also need to consider that for close clean (see my other comment above, today we did it by using two `closeTaskClean`, one encapsulating the exception captures and do not remove from tasks, used for 2), the other not encapsulating the exception captures and do remove from tasks, used for 1)).
   
   I think we should just clear it a bit by just having a single `closeTaskClean/Dirty` which does not try to remove from the tasks inside (and also I suggest we do exception capturing at the caller, not inside, but that's open for discussions :), and just let the caller to decide whether to remove from `tasks` depending on whether the function call is for 1) or 2) above.
   
   Also, I feel that once we complete this, then tasks would contain much less meaningful fields, and we potentially could dissolve `tasks` and just keep all its bookkeepings as part of the TaskManager, e.g. as `runningActiveTasks` and add functions for adding removing those bookkeeping tasks then for unit testing purposes. But that's for the future discussion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r955703303


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   Ack, that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12554:
URL: https://github.com/apache/kafka/pull/12554#issuecomment-1225647992

   Call for review: @wcarlson5 @lihaosky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12554:
URL: https://github.com/apache/kafka/pull/12554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954595559


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   I called this method `restoresActiveTasks()` instead of `hasActiveTasks()` or `containsActiveTasks()` because there might also be active tasks in restoration that are paused. IMO, we should not return true if the state updater only has active tasks in restoration that are paused. If we did that we would not transit the stream thread to `RUNNING` when all non-paused active tasks left the state updater.
   
   However, that would also mean that if all non-paused active tasks are `RUNNING` (i.e. the stream thread is also `RUNNING`) and the paused active tasks in restoration are unpaused, `restoresActiveTasks()` would again return `true` (i.e. there are active tasks in restoration) without a rebalancing in between. Due to this case we need to be careful to not switch the state of the stream thread if the stream thread is `RUNNING` and `restoresActiveTasks()` returns `true`. 
   
   For all of this, I think it is not a good idea to provide a default implementation to `restoresActiveTasks()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12554:
URL: https://github.com/apache/kafka/pull/12554#issuecomment-1230037477

   @guozhangwang Do you think this PR ready for merge? If yes, could you approve it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954204311


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -565,6 +583,303 @@ public void shouldHandleRemovedTasksFromStateUpdater() {
     }
 
     @Test
+    public void shouldTransitRestoredTaskToRunning() {

Review Comment:
   Nice mocks, love them!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() {
         final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : stateUpdater.drainRemovedTasks()) {
-            final TaskId taskId = task.id();
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                try {
-                    final Task newTask = task.isActive() ?
-                        convertActiveToStandby((StreamTask) task, inputPartitions) :
-                        convertStandbyToActive((StandbyTask) task, inputPartitions);
-                    newTask.initializeIfNeeded();
-                    stateUpdater.add(newTask);
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", taskId);
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(taskId, e);
-                }
+                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
-                try {
-                    task.suspend();
-                    task.closeClean();
-                    if (task.isActive()) {
-                        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", task.id());
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(task.id(), e);
-                }
+                closeTaskClean(task, tasksToCloseDirty, taskExceptions);

Review Comment:
   We used to wrap the logic for collecting exceptions and updating close-dirty tasks in a single call, but I found it a bit clumsy and hard to reason for different exception handling (at least for now --- I hope we can get a simpler exception handling in the refactoring). So I pealed it off, and so now the existing `closeTaskClean` only does the closing part:
   
   ```
   private void closeTaskClean(final Task task) {
           task.closeClean();
           tasks.removeTask(task);
           if (task.isActive()) {
               activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
           }
       }
   ```
   
   But the reason I could not use that function and catch-exception here the tasks to close-clean are already not in the `tasks` in our refactoring.
   
   Thinking about this a bit more, I still feel that we should do the exception catching outside the core logic of closing tasks, but we can still try to remove some duplicated code here e.g. we `try-catch` the whole if-else block, than doing that per branch.
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   nit: `hasActiveTasks`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         return offsetSum;
     }
 
-    private void closeTaskDirty(final Task task) {
+    private void closeTaskDirty(final Task task, final boolean removeFromTasksRegistry) {

Review Comment:
   I have been also thinking about the taskRegistry / tasks here regarding when to remove from it. Since in the new model, this class only bookkeep running active tasks, while standby tasks and restoring active tasks are in the state updater, and the task-manager would need to get a union of them to return all managed tasks. So when we close a task:
   
   1) If the task is a running active task, then we need to remove it from the `tasks` also.
   2) If the task is retrieved from the state updater, then we do not need to remove it from the `tasks`.
   
   That means, we also need to consider that for close clean (see my other comment above, today we did it by using two `closeTaskClean`, one encapsulating the exception captures and do not remove from tasks, used for 2), the other not encapsulating the exception captures and do remove from tasks, used for 1)).
   
   I think we should just clear it a bit by just having a single `closeTaskClean/Dirty` which does not try to remove from the tasks inside (and also I suggest we do exception capturing at the caller, not inside, but that's open for discussions :), and just let the caller to decide whether to remove from `tasks` depending on whether the function call is for 1) or 2) above.
   
   Also, I feel that once we complete this, then tasks would contain much less meaningful fields, and we potentially could dissolve the `tasks` and just keep it as part of the TaskManager as `runningActiveTasks` then. But that's for the future discussion.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   Also nit: could we just do `!getActiveTasks().isEmpty()` in the interface as a default func impl, so that if we want to mock it, we do not need to impl it again?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() {
         final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : stateUpdater.drainRemovedTasks()) {
-            final TaskId taskId = task.id();
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                try {
-                    final Task newTask = task.isActive() ?
-                        convertActiveToStandby((StreamTask) task, inputPartitions) :
-                        convertStandbyToActive((StandbyTask) task, inputPartitions);
-                    newTask.initializeIfNeeded();
-                    stateUpdater.add(newTask);
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", taskId);
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(taskId, e);
-                }
+                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
-                try {
-                    task.suspend();
-                    task.closeClean();
-                    if (task.isActive()) {
-                        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", task.id());
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(task.id(), e);
-                }
+                closeTaskClean(task, tasksToCloseDirty, taskExceptions);

Review Comment:
   And to further remove duplicated logic, I think we can consolidate the `handleRemovedTasksFromStateUpdater` and `handleRestoredTasks` into e.g. a `tryHandleTasksFromStateUpdater` which checks if the task should be 1) recycled, 2) closed-clean / dirty, 3) update input partitions (and then give back to state updater). If the task falls on any of these conditions, return true otherwise false.
   
   Then in `handleRemovedTasksFromStateUpdater`:
   
   ```
   if (!tryHandleTasksFromStateUpdater(..)) {
       throw new IllegalStateException("this should not happen");
   }
   ```
   
   And in `handleRestoredTasksFromStateUpdater`:
   
   ```
   if (!tryHandleTasksFromStateUpdater(..)) {
       transitRestoredTaskToRunning(task);
   }
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r955423851


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -183,6 +185,21 @@ public int hashCode() {
      */
     Set<StreamTask> getActiveTasks();
 
+    /**
+     * Returns if the state updater restores active tasks.
+     *
+     * The state updater restores active tasks if at least one active task was added with the {@link StateUpdater#add(Task)}

Review Comment:
   Is this if it it _currently_ restoring an active task? It read like if it is able to do so. I guess I am not sure what this method is for.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         return offsetSum;
     }
 
-    private void closeTaskDirty(final Task task) {
+    private void closeTaskDirty(final Task task, final boolean removeFromTasksRegistry) {

Review Comment:
   + 1 I agree that it would be best to separate this logic out. I think it would help with maintaining and readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r955703738


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() {
         final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : stateUpdater.drainRemovedTasks()) {
-            final TaskId taskId = task.id();
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                try {
-                    final Task newTask = task.isActive() ?
-                        convertActiveToStandby((StreamTask) task, inputPartitions) :
-                        convertStandbyToActive((StandbyTask) task, inputPartitions);
-                    newTask.initializeIfNeeded();
-                    stateUpdater.add(newTask);
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", taskId);
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(taskId, e);
-                }
+                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
-                try {
-                    task.suspend();
-                    task.closeClean();
-                    if (task.isActive()) {
-                        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", task.id());
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(task.id(), e);
-                }
+                closeTaskClean(task, tasksToCloseDirty, taskExceptions);

Review Comment:
   Sounds good :) Let's do the queue merging in follow-up PRs then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954702607


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() {
         final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : stateUpdater.drainRemovedTasks()) {
-            final TaskId taskId = task.id();
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
-                try {
-                    final Task newTask = task.isActive() ?
-                        convertActiveToStandby((StreamTask) task, inputPartitions) :
-                        convertStandbyToActive((StandbyTask) task, inputPartitions);
-                    newTask.initializeIfNeeded();
-                    stateUpdater.add(newTask);
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", taskId);
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(taskId, e);
-                }
+                recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
-                try {
-                    task.suspend();
-                    task.closeClean();
-                    if (task.isActive()) {
-                        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
-                        "Attempting to handle remaining tasks before re-throwing:", task.id());
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(task.id(), e);
-                }
+                closeTaskClean(task, tasksToCloseDirty, taskExceptions);

Review Comment:
   I think reducing code duplication makes a lot of sense. However, can we postpone this refactorings after we have a running system with state updater?
   I tried to apply your suggestions regarding `tryHandleTasksFromStateUpdater()` but I do not like the idea of using a boolean to distinguish between removed and restored tasks. I think it would be better to merge the queue of restored and removed tasks as I proposed in our last discussion.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r953802616


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -50,10 +50,11 @@
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.easymock.EasyMock.anyBoolean;

Review Comment:
   Removed EasyMock dependency since we do not use EasyMock in this test class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12554:
URL: https://github.com/apache/kafka/pull/12554#issuecomment-1228111575

   Thanks @cadonna for your replies.
   
   Regarding https://github.com/apache/kafka/pull/12554#discussion_r954203463 I think similarly we can do that in follow-up PRs if you like. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12554: KAFKA-10199: Handle restored tasks output by state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r955764822


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -183,6 +185,21 @@ public int hashCode() {
      */
     Set<StreamTask> getActiveTasks();
 
+    /**
+     * Returns if the state updater restores active tasks.
+     *
+     * The state updater restores active tasks if at least one active task was added with the {@link StateUpdater#add(Task)}

Review Comment:
   I am not sure what you mean with "if it is able to do so". The state updater manages active tasks in restoration and standby task. This method should return true, if there are non-paused active tasks somewhere in the state updater. The return value is used to decide whether the stream thread can transit to `RUNNING`. 
   
   I realised now that I forgot to add the non-paused part to the java docs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org