You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/19 19:00:33 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442997760



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       I think we should not add this handling for now based on the conclusion that after one task caused `abortTxn` is called, no other tasks should ever call recordCollector#flush/send/close anymore right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -522,7 +522,7 @@ private void close(final boolean clean) {
         if (clean && commitNeeded) {
             log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                           + " commit and should close as dirty instead");
-            throw new StreamsException("Tried to close dirty task as clean");

Review comment:
       This comment is worthy to be a comment on the code itself :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       Hmm, it is used in `streamsMetrics.removeAllTaskLevelSensors(threadId, task.id().toString());` in `cleanupTask`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       Why only add active tasks here, not standby tasks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org