You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/25 12:50:48 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12519: [WIP] KAFKA-10199: Handle exceptions from state updater

cadonna commented on code in PR #12519:
URL: https://github.com/apache/kafka/pull/12519#discussion_r954839073


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -877,6 +877,8 @@ private void initializeAndRestorePhase() {
             // transit to restore active is idempotent so we can call it multiple times
             changelogReader.enforceRestoreActive();
 
+            taskManager.tryHandleExceptionsFromStateUpdater();

Review Comment:
   Could we also handle the exceptions inside `tryToCompleteRestoration()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -667,6 +672,25 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    public void tryHandleExceptionsFromStateUpdater() {
+        if (stateUpdater != null) {
+            final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
+
+            for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : stateUpdater.drainExceptionsAndFailedTasks()) {
+                final RuntimeException exception = exceptionAndTasks.exception();
+                final Set<Task> failedTasks = exceptionAndTasks.getTasks();
+
+                for (final Task failedTask : failedTasks) {
+                    // need to add task back to the bookkeeping to be handled by the stream thread
+                    tasks.addTask(failedTask);
+                    taskExceptions.put(failedTask.id(), exception);
+                }
+            }
+
+            maybeThrowTaskExceptions(taskExceptions);

Review Comment:
   Here we basically transform a mapping from exception -> set of tasks to a mapping task ID -> exception and then in `maybeThrowTaskExceptions()` we again transform the mapping task ID -> exception to a mapping exception -> set of tasks. 
   
   Can we not make this a bit more straightforward by skipping the transformation to task ID -> exception?
    
   Is it really necessary to aggregate all `TaskCorruptedException` into a new one? I think by re-throwing a brand new `TaskCorruptedException` we lose the information where the actual exception happened in the stacktrace which might make debugging harder. 
   
   I see that with aggregating the exception we can revive all corrupted tasks all at once, but I am not sure if it is worth losing debugging information. One option could be to not drain the exceptions from the state updater, but handle one after the other. Another option could be to collect the `TaskCorruptedException` in the new exception and log all of them when the exception is caught. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -667,6 +672,25 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    public void tryHandleExceptionsFromStateUpdater() {
+        if (stateUpdater != null) {
+            final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
+
+            for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : stateUpdater.drainExceptionsAndFailedTasks()) {
+                final RuntimeException exception = exceptionAndTasks.exception();
+                final Set<Task> failedTasks = exceptionAndTasks.getTasks();
+
+                for (final Task failedTask : failedTasks) {
+                    // need to add task back to the bookkeeping to be handled by the stream thread
+                    tasks.addTask(failedTask);

Review Comment:
   Method `addTask()` does not add the partitions to the `activeTasksPerPartition` map in `Tasks`. We need to extend `addTask()` and consolidate it with `addNewActiveTask()`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -227,34 +227,35 @@ boolean handleCorruption(final Set<TaskId> corruptedTasks) {
 
     private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) {
         for (final Task task : taskWithChangelogs) {
-            final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions();
+            if (task.state() != State.CLOSED) {
+                final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions();
 
-            // mark corrupted partitions to not be checkpointed, and then close the task as dirty
-            if (markAsCorrupted) {
-                task.markChangelogAsCorrupted(corruptedPartitions);
-            }
+                // mark corrupted partitions to not be checkpointed, and then close the task as dirty
+                if (markAsCorrupted) {
+                    task.markChangelogAsCorrupted(corruptedPartitions);
+                }
 
-            try {
-                // we do not need to take the returned offsets since we are not going to commit anyways;
-                // this call is only used for active tasks to flush the cache before suspending and
-                // closing the topology
-                task.prepareCommit();
-            } catch (final RuntimeException swallow) {
-                log.error("Error flushing cache for corrupted task {} ", task.id(), swallow);
-            }
+                try {
+                    // we do not need to take the returned offsets since we are not going to commit anyways;
+                    // this call is only used for active tasks to flush the cache before suspending and
+                    // closing the topology
+                    task.prepareCommit();
+                } catch (final RuntimeException swallow) {
+                    log.error("Error flushing cache for corrupted task {} ", task.id(), swallow);
+                }
 
-            try {
-                task.suspend();
+                try {
+                    task.suspend();
 
-                // we need to enforce a checkpoint that removes the corrupted partitions
-                if (markAsCorrupted) {
-                    task.postCommit(true);
+                    // we need to enforce a checkpoint that removes the corrupted partitions
+                    if (markAsCorrupted) {
+                        task.postCommit(true);

Review Comment:
   Aren't we removing the corrupted partitions twice if we use the state updater?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org