You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/26 19:04:17 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10407: KAFKA-12523: handle TaskCorruption for revoked tasks & remove commit in handleCorruption

guozhangwang commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602527847



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -167,18 +167,7 @@ void handleCorruption(final Set<TaskId> corruptedTasks) {
             }
         }
 
-        // Make sure to clean up any corrupted standby tasks in their entirety before committing
-        // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks
         closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()

Review comment:
       I dug a bit further, and I think the reason for this commit is here: https://github.com/apache/kafka/pull/8440/files#r407722022
   
   This is for EOS-beta specifically: inside handleCorrupted we would call `task.closeDirty` and which would call `recordCollector.closeDirty`, which would call `producer.abortTxn`. In EOS-beta, this would abort transaction for all tasks.
   
   So I feel maybe we should refactor this a bit, i.e. in `closeAndRevive` we do not close its record collector since we know the task is going to be revived anyways, but instead we need to cleanup the collector (e.g. reset exception flag, clear sent future bookkeeping etc), to workaround it. In this way we would not need to commit before `handleCorrupted`. WDYT?
   
   Also cc @mjsax who worked on #8440

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,41 +498,60 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete suspending those tasks, so we would capture
+        // any exception and rethrow it at the end
+        final Set<TaskId> corruptedTasks = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
-        }
 
-        // only try to complete post-commit if committing succeeded;
-        // we enforce checkpointing upon suspending a task: if it is resumed later we just
-        // proceed normally, if it is going to be closed we would checkpoint by then
-        if (firstException.get() == null) {
-            for (final Task task : revokedActiveTasks) {
-                try {
-                    task.postCommit(true);
-                } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + task.id(), e);
-                    firstException.compareAndSet(null, e);
+            // If we hit a TaskCorruptedException, we should just handle the cleanup for those corrupted tasks right here
+            if (e instanceof TaskCorruptedException) {
+                corruptedTasks.addAll(((TaskCorruptedException) e).corruptedTasks());
+                for (final TaskId taskId : corruptedTasks) {
+                    final Task task = tasks.task(taskId);
+                    task.markChangelogAsCorrupted(task.changelogPartitions());
                 }
+                handleCorruption(corruptedTasks);

Review comment:
       Ditto here, for EOS-beta, we need to make sure it would not incorrectly abort the whole transaction that affects all tasks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org