You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/18 23:41:26 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

ableegoldman opened a new pull request #8900:
URL: https://github.com/apache/kafka/pull/8900


   If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous `send`. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: [KAFKA-10169](https://issues.apache.org/jira/browse/KAFKA-10186))
   
   The reason we ended up aborting an un-flushed transaction was due to the combination of
   a. always aborting the ongoing transaction when any task is closed/revoked
   b. only committing (and flushing) if at least one of the revoked tasks needs to be committed
   
   Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked.
   
   We can handle this by splitting the RecordCollector's `close` into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443809040



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       Lol, Guozhang, that's like the 3rd time in a week you've been looking at the old branch. I guess kafka-raft is not so up-to-date huh 😛 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443877830



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       If we revive a task, we don't recreate the record collector AFAICT. So there may still be a `sendException` hanging around even after we `close` the record collector. If this was a truly-fatal exception, we'll check and throw it. But we shouldn't rethrow this particular non-fatal exception. Therefore, we need to check for it and reset the `sendException` iff we find this exact exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648456919


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443847867



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       No it is not at all :) https://github.com/confluentinc/kafka/commits/kafka-raft




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443760619



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during shutdown", e);
-            firstException.compareAndSet(null, e);
-        }
 
-        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task);
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during shutdown", e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+
+                // If the commit fails, everyone who participated in it must be closed dirty
+                tasksToCloseDirty.addAll(filterActive(tasksToCommit));
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.clear();
+            }
+
+            for (final Task task : tasksToCommit) {

Review comment:
       I guess, technically it's only required for eos-beta, but we to avoid too many different cases, we might just want to do the same thing for all cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444525997



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException));
+        tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException));
+
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task);
+        }
+
+        for (final Task task : activeTaskIterable()) {
+            executeAndMaybeSwallow(
+                clean,

Review comment:
       I think this is more in line with the general code flow elsewhere. Note that if we started out clean but became dirty and had to close some tasks as such, we would have already caught an exception somewhere. So `AtomicReference#compareAndSet` would be a no-op, and it actually doesn't matter what we pass in here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443854106



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");

Review comment:
       I guess we don't




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443871712



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       > I think, that we should never hit this exception during closeClean, that is, we should never call closeClean on a task/record collector that was part of an aborted transaction.
   
   Agreed.
   
   I was hoping we don't need this hack at all? Why do we want to call `checkForException` in close dirty?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444526844



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##########
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
                 " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
 
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

Review comment:
       For those particular test, considering their names, it seem the tests are void now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442671271



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       Should we avoid passing this exception to ```ProductionExceptionHandler``` as it never breaks sent now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443759806



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       > You mean they can be closed clean right? In that case I'd agree :)
   
   I guess :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444522543



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -152,9 +152,9 @@ public StreamsMetadata getLocalMetadata() {
         }
 
         if (globalStores.contains(storeName)) {
-            // global stores are on every node. if we dont' have the host info
+            // global stores are on every node. if we don't have the host info
             // for this host then just pick the first metadata
-            if (thisHost == UNKNOWN_HOST) {
+            if (thisHost.equals(UNKNOWN_HOST)) {

Review comment:
       Will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444517746



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException));
+        tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException));
+
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task);
+        }
+
+        for (final Task task : activeTaskIterable()) {
+            executeAndMaybeSwallow(
+                clean,

Review comment:
       If `tasksToCloseDirty` is not empty, should we close dirty, too, ie pass in `clean && tasksToCloseDirty.isEmpty()` ?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442997760



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       I think we should not add this handling for now based on the conclusion that after one task caused `abortTxn` is called, no other tasks should ever call recordCollector#flush/send/close anymore right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -522,7 +522,7 @@ private void close(final boolean clean) {
         if (clean && commitNeeded) {
             log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                           + " commit and should close as dirty instead");
-            throw new StreamsException("Tried to close dirty task as clean");

Review comment:
       This comment is worthy to be a comment on the code itself :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       Hmm, it is used in `streamsMetrics.removeAllTaskLevelSensors(threadId, task.id().toString());` in `cleanupTask`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       Why only add active tasks here, not standby tasks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443938725



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       What about clearing the exception in `checkForException` instead:
   ```
   private void checkForException() {
       if (sendException != null) {
           final KafkaException rethrow = sendException;
           sendException = null;
           throw rethrow;
       }
   }
   ```
   We need to fix `sendException` to make it thread safe though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443852676



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }

Review comment:
       Ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444530988



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##########
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
                 " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
 
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

Review comment:
       I guess I should just break these up into different tests then, huh. Will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444516858



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -152,9 +152,9 @@ public StreamsMetadata getLocalMetadata() {
         }
 
         if (globalStores.contains(storeName)) {
-            // global stores are on every node. if we dont' have the host info
+            // global stores are on every node. if we don't have the host info
             // for this host then just pick the first metadata
-            if (thisHost == UNKNOWN_HOST) {
+            if (thisHost.equals(UNKNOWN_HOST)) {

Review comment:
       Should we fix this in older branches (2.5/2.4), too? (ie follow up PR)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443851041



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,7 +542,12 @@ private void close(final boolean clean) {
                     "state manager close",
                     log);
 
-                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
+                executeAndMaybeSwallow(
+                    clean,
+                    clean ? recordCollector::closeClean : recordCollector::closeDirty,

Review comment:
       I'm just following the pattern we use elsewhere with `closeDirty`/`closeClean`. Personally I think it does make the code a bit more readable so you don't have to then go and look up what the boolean argument to `RecordCollector#close` actually is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443850536



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       Sounds good. Will refactor a bit to clarify the active/standby handling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443237377



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       Makes sense, I think we can avoid `checkForException` in `closeDirty`, and if that is sufficient we can remove the exception handling based on error message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443849945



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       1. If we are calling from `closeDirty`, then exceptions from `recordCollector.close` would be swallowed and logged.
   
   2. If we are calling from `closeAndRecycleState`, then the upper caller from TaskManager would capture all RE and move the task to toCloseDirty.
   
   3. The only thing that needs to be careful is calling from `closeClean` as Matthias pointed as an example above.
   
   So looking from that side, to keep the error logged down maybe we should still always checkForException in `RecordCollector.close` while at the same time keep this hacky error message handling inside RC.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648298106


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444526844



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##########
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
                 " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
 
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

Review comment:
       For this particular test, considering the name, it seem the test is void now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444518329



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException));
+        tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException));
+
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task);
+        }
+
+        for (final Task task : activeTaskIterable()) {
+            executeAndMaybeSwallow(
+                clean,
+                () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                e -> firstException.compareAndSet(null, e),
+                e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+            );
+        }
+
+        executeAndMaybeSwallow(
+            clean,
+            activeTaskCreator::closeThreadProducerIfNeeded,
+            e -> firstException.compareAndSet(null, e),
+            e -> log.warn("Ignoring an exception while closing thread producer.", e)
+        );
+
+        tasks.clear();
+
+
+        // this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
+        // have still been in CREATED at the time of shutdown, since Task#close will not do so
+        executeAndMaybeSwallow(
+            clean,

Review comment:
       As above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #8900:
URL: https://github.com/apache/kafka/pull/8900


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443238726



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during shutdown", e);
-            firstException.compareAndSet(null, e);
-        }
 
-        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task);
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during shutdown", e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+
+                // If the commit fails, everyone who participated in it must be closed dirty
+                tasksToCloseDirty.addAll(filterActive(tasksToCommit));
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.clear();
+            }
+
+            for (final Task task : tasksToCommit) {

Review comment:
       Regarding `If any active tasks have to be closed dirty and can't be committed, none of them can be`, is that only for eos-beta?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648525527


   Merged to trunk and cherry-picked to 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443067211



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       In the current code, we might still need to close tasks, right? If a TX is aborted, we need to "reset" all active tasks accordingly and this would imply, closing and reviving them? And while closing we would call `checkForException` and crash without this guard?
   
   What makes we wonder, if we should actually `checkForException` in `closeDirty()` above? If a TX is aborted, and we closeDirty and don't call `checkForException` it seems we don't need this guard? (In general, this guard seems a little bit hacky and it would be great if we could avoid it IMHO.)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       I looked into the code, and the answer is "no".  It's called within `StreamTask` / `StandbyTask` methods, `closeClean`, `closeDirty`, and `closeAndRecycleState`. So It seems fine?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {

Review comment:
       Good thinking!
   
   We should mention `RecordCollectorImpl` as it's the class the relies on the error message.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");

Review comment:
       Why do we need to set clientId?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       Standbys done affect the TX-producer and thus they can be closed dirty without side effect.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be

Review comment:
       typo: `closed`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,7 +542,12 @@ private void close(final boolean clean) {
                     "state manager close",
                     log);
 
-                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
+                executeAndMaybeSwallow(
+                    clean,
+                    clean ? recordCollector::closeClean : recordCollector::closeDirty,

Review comment:
       It seems this is the only place we call `closeDirty`, thus, I am wondering if it might be better to use a boolean flag ie, `RecordCollector#close(boolean)` and just call `() -> recordCollector(clean)` here?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer());
+
+        final ProducerRecord<String, String> record = new ProducerRecord<>(SINGLE_PARTITION_INPUT_TOPIC, "value");
+
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record);

Review comment:
       Do we need this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during shutdown", e);
-            firstException.compareAndSet(null, e);
-        }
 
-        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task);
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during shutdown", e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+
+                // If the commit fails, everyone who participated in it must be closed dirty
+                tasksToCloseDirty.addAll(filterActive(tasksToCommit));
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.clear();
+            }
+
+            for (final Task task : tasksToCommit) {

Review comment:
       Similar question as above: should we move this loop into the same try-catch as `commitOffsetsOrTransaction` ?This would make it clear that `postCommit()` should only be executed if committing was successful (even if the current code is correct as `taskToCommit` would empty if an error occurred.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer());

Review comment:
       Should we use try-with-resource? Or at least try-final and call `producer.close` in the finally block?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer());
+
+        final ProducerRecord<String, String> record = new ProducerRecord<>(SINGLE_PARTITION_INPUT_TOPIC, "value");
+
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record);
+
+        final AtomicReference<Exception> receivedException = new AtomicReference<>(null);
+        producer.send(record, (recordMetadata, exception) -> receivedException.compareAndSet(null, exception));

Review comment:
       Do we need to make them producer config changes to ensure that the producer does not flush the record by chance? (eg, increase `linger.ms` to `MAX_VALUE` or similar?)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }

Review comment:
       Should we do an `else` here and only call `commitOffsetsOrTransaction` (and do the `postCommit` loop) in the `else` branch (I understand that the current code is correct, as `commitOffsetOrTx()` would be a no-op for this case -- just wondering, if the code would be easier to read?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443237513



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       You're right, I was looking at another branch (kafka-raft) again.. the jumping back-and-forth reviewing those PR keep confusing me, sorry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443854750



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer());
+
+        final ProducerRecord<String, String> record = new ProducerRecord<>(SINGLE_PARTITION_INPUT_TOPIC, "value");
+
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record);

Review comment:
       Nope, that just slipped in there 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648404327


   One unrelated failure: `PlaintextProducerSendTest.testNonBlockingProducer`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444517907



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException));
+        tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException));
+
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task);
+        }
+
+        for (final Task task : activeTaskIterable()) {
+            executeAndMaybeSwallow(
+                clean,
+                () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                e -> firstException.compareAndSet(null, e),
+                e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+            );
+        }
+
+        executeAndMaybeSwallow(
+            clean,

Review comment:
       As above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443864310



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       I _think_, that we should never hit this exception during `closeClean`, that is, we should never call `closeClean` on a task/record collector that was part of an aborted transaction. 
   
   The transaction will be aborted if a task had to be closed dirty due to:
   1. suspend/preCommit/commit failed: in this. case, any task that has an ongoing transaction will throw an exception at the beginning of `closeClean` and then be closed dirty
   2. postCommit/closeClean fails: in this case, one task might throw during `postCommit` or `closeClean` and cause it to be closed dirty while all other tasks are closed clean. But by definition, we have just committed the transaction, so there is no in-flight txn to be aborted
   
   So, I think it is ok to just do this special exception handling inside `closeDirty`, and reset the `sendException` to `null` if we find this exception. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443875108



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       Ok, we can do that, but we only get the first exception back anyway. -- And for this, we also don't need the "hack" to check for a specific exception. In `closeDirty()` we just do
   ```
   try {
     checkForException():
   } catch (final RuntimeException logAndSwallow) {
      log.error(...);
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443950906



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##########
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
                 " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
 
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

Review comment:
       These tests used to rely on the fact that the `sendException` was never forgotten by just setting it once and then asserting that multiple subsequent calls also threw it. So now we need to call `send` before each to re-insert the exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443855731



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -250,10 +250,26 @@ public void flush() {
      * @throws TaskMigratedException recoverable error that would cause the task to be removed
      */
     @Override
-    public void close() {
-        log.info("Closing record collector");
+    public void closeClean() {
+        log.info("Closing record collector clean");
+
+        // No need to abort transaction during a clean  close: either we have successfully committed the ongoing
+        // transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
+        // tasks had any data in the current transaction and therefore there is no need to commit or abort it

Review comment:
       nit: period at the end

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -250,10 +250,26 @@ public void flush() {
      * @throws TaskMigratedException recoverable error that would cause the task to be removed
      */
     @Override
-    public void close() {
-        log.info("Closing record collector");
+    public void closeClean() {
+        log.info("Closing record collector clean");
+
+        // No need to abort transaction during a clean  close: either we have successfully committed the ongoing

Review comment:
       nit: clean close




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443237917



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       You mean `they can be closed clean` right? In that case I'd agree :) Realized that we are still closing standby as clean if one of the active task is causing all other active tasks to be closed dirty. The looping over all tasks above is a bit confusing to me.
   
   Maybe a subjective, nit suggestion here is to first loop over active, and then loop over standby and in the second loop we do not need the one-spoils-all logic anymore. Although it is a bit duplicated code it would make logic cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444538711



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##########
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
                 " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
 
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

Review comment:
       Not 100% sure how `OnSubsequentCall` is meant either. But what you say seems to make sense and thus it should be different test. Thanks for the extra mile splitting them up!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442619459



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -522,7 +522,7 @@ private void close(final boolean clean) {
         if (clean && commitNeeded) {
             log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                           + " commit and should close as dirty instead");
-            throw new StreamsException("Tried to close dirty task as clean");

Review comment:
       This was a sort-of bug: because we don't close things during `handleRevocation`, we want to make sure the TM will close this as dirty during `handleAssignment`. So we throw this just to force it to call `closeDirty` -- but it wasn't necessarily a fatal exception that caused commit to fail, so we should just throw TaskMigrated here.
   That said, it doesn't really matter since the ConsumerCoordinator will save and rethrow only the first exception, which is the `handleRevocation` exception. Anything we throw in `handleAssignment` is "lost" -- but we should do the right thing anyway




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442619557



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       My IDEA pointed out that this was never used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443950646



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##########
@@ -359,7 +359,9 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore()
         waitForStandbyCompletion(client1, 1, 30 * 1000L);
         waitForStandbyCompletion(client2, 1, 30 * 1000L);
 
-        assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(0));
+        // Sometimes the store happens to have already been closed sometime during startup, so just keep track
+        // of where it started and make sure it doesn't happen more times from there
+        final int initialStoreCloseCount = CloseCountingInMemoryStore.numStoresClosed();

Review comment:
       Saw this fail locally so just did a minor flaky test fix on the side




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443873843



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       Well, to still log the error message while swallowing it :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443808226



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -522,7 +522,7 @@ private void close(final boolean clean) {
         if (clean && commitNeeded) {
             log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                           + " commit and should close as dirty instead");
-            throw new StreamsException("Tried to close dirty task as clean");

Review comment:
       ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648472153


   LGTM! I will merge after green jenkins.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443852154



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task " + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during shutdown", e);
-            firstException.compareAndSet(null, e);
-        }
 
-        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task);
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during shutdown", e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+
+                // If the commit fails, everyone who participated in it must be closed dirty
+                tasksToCloseDirty.addAll(filterActive(tasksToCommit));
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.clear();
+            }
+
+            for (final Task task : tasksToCommit) {

Review comment:
       Yeah I agree with Matthias, with eos-alpha we don't currently differentiate which tasks may have succeeded the commit and. which may have failed, so this would require a larger refactoring. My take is that it doesn't really matter much, since practically speaking it's unlikely that one task would successfully commit while other tasks would fail
   (ack on moving into the same try-catch)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443875108



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       Ok, we can do that, we only get the first exception back anyway. -- And for this, we also don't need the "hack" to check for a specific exception. In `closeDirty()` we just do
   ```
   try {
     checkForException():
   } catch (final RuntimeException logAndSwallow) {
      log.error(...);
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443871959



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,7 +542,12 @@ private void close(final boolean clean) {
                     "state manager close",
                     log);
 
-                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
+                executeAndMaybeSwallow(
+                    clean,
+                    clean ? recordCollector::closeClean : recordCollector::closeDirty,

Review comment:
       Sure. Was just an idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444537297



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -152,9 +152,9 @@ public StreamsMetadata getLocalMetadata() {
         }
 
         if (globalStores.contains(storeName)) {
-            // global stores are on every node. if we dont' have the host info
+            // global stores are on every node. if we don't have the host info
             // for this host then just pick the first metadata
-            if (thisHost == UNKNOWN_HOST) {
+            if (thisHost.equals(UNKNOWN_HOST)) {

Review comment:
       https://github.com/apache/kafka/pull/8919




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444528622



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##########
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
                 " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
 
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

Review comment:
       Maybe I misinterpreted this, but I took the `OnSubsequentCall` in the name to mean that it would throw on the next (ie subsequent) call after the _send_, not that it would continue to throw on all subsequent calls. ie I think it should actually be several different tests (one for each "call" that should throw) but got mashed into just one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648457282


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443855976



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted()  {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer());
+
+        final ProducerRecord<String, String> record = new ProducerRecord<>(SINGLE_PARTITION_INPUT_TOPIC, "value");
+
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record);
+
+        final AtomicReference<Exception> receivedException = new AtomicReference<>(null);
+        producer.send(record, (recordMetadata, exception) -> receivedException.compareAndSet(null, exception));

Review comment:
       Hm, yeah, good call. I didn't realize this defaults to 0




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443809040



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       Lol, Guozhang, that's like the 3rd time in a week you've been looking at an old branch. I guess kafka-raft is not so up-to-date huh 😛 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443828344



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) {

Review comment:
       That sounds right, we should never see this exception outside of `closeDirty` since we should then close all tasks dirty if the transaction is aborted.
   But as for whether to check it in `closeDirty`, I think we would need to at the very least check it so we can reset the exception afterwards. Or do you think it's "safe" to just blindly reset the exception in the case of a dirty close, no matter what it was?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org