You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/14 18:34:58 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

vvcephei commented on a change in pull request #8667:
URL: https://github.com/apache/kafka/pull/8667#discussion_r425348145



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -555,28 +555,35 @@ void runLoop() {
             } catch (final TaskCorruptedException e) {
                 log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
                              "Will close the task as dirty and re-create and bootstrap from scratch.", e);
-
-                taskManager.commit(
-                    taskManager.tasks()
-                        .values()
-                        .stream()
-                        .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
-                        .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
-                        .collect(Collectors.toSet())
-                );
-                taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+                try {
+                    taskManager.commit(
+                        taskManager.tasks()
+                            .values()
+                            .stream()
+                            .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
+                            .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
+                            .collect(Collectors.toSet())
+                    );
+                    taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+                } catch (final TaskMigratedException taskMigrated) {
+                    handleTaskMigrated(taskMigrated);

Review comment:
       Should we also handle the corrupted tasks here (before this line), so that they can be already cleaned up before the next round? Or, alternatively, should we move `taskManager.handleCorruption(e.corruptedTaskWithChangelogs());` to before the attempted commit (it looks like it could be outside the try block as well).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org