You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/11 10:21:54 UTC

[GitHub] [kafka] nym3r0s opened a new pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

nym3r0s opened a new pull request #9280:
URL: https://github.com/apache/kafka/pull/9280


   KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
   
   If a transaction is aborted with no underlying exception, throw
   a new kind of exception - `TransactionAbortedException` to
   distinguish this from other fatal exceptions.
   
   This is part of KIP-654 and resolves KAFKA-10186
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934


   Also, looks like there were checkstyle issues. Probably you just need to add the license header to the new file


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386


   @mjsax @ableegoldman @hachikuji - tagging for awareness. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934


   Also, looks like there were checkstyle issues. Probably you just need to add the license header to the new file


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691464378


   @ableegoldman - you're right. I've missed out the license header. I'll also add the tests for the change. thank you 🙂


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704502487


   LGTM!
   
   Looks like the builds just failed with some unrelated flaky tests:
   ```
   Build / JDK 8 / org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
   Build / JDK 8 / kafka.api.TransactionsTest.testBumpTransactionalEpoch
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704718666


   ha - thank you 😄


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-702794731


   @hachikuji @ableegoldman - I've added a test for the same exception. Can you pls take a look and let me know if any changes are needed? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-699364431


   > 
   > 
   > @ableegoldman I agree it would be useful to have a test case. If you have time, maybe you can help out in a follow-up?
   
   @hachikuji @ableegoldman - I'm so sorry about the delay. I'm a bit confused on whether to add the test on the `Sender` or the `RecordAccumulator`. I'm still getting up to speed with the codebase, so any tips would be great. Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704587824


   I don't think they're fixed on trunk lol 🙂  (besides, I believe Jenkins merges the PR with trunk before running the tests so it's running the most recent code anyways). 
   
   None of the failures seem to be related to this PR so I wouldn't worry about it. I think this PR is ready to be merged


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r499939576



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2358,6 +2359,41 @@ public void testForceShutdownWithIncompleteTransaction() {
         }
     }
 
+    @Test
+    public void testTransactionAbortedExceptionOnAbortWithoutError() throws InterruptedException, ExecutionException {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        TransactionManager txnManager = new TransactionManager(logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions, false);
+
+        setupWithTransactionState(txnManager, false, null);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+        // Begin the transaction
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartitionToTransaction(tp0);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE)));
+        // Run it once so that the partition is added to the transaction.
+        sender.runOnce();
+        // Append a record to the accumulator.
+        FutureRecordMetadata metadata1 = appendToAccumulator(tp0, time.milliseconds(), "key1", "value1");
+        // Now abort the transaction manually.
+        txnManager.beginAbort();
+        // Try to send.
+        // This should abort the existing transaction and
+        // drain all the unsent batches with a TransactionAbortedException.
+        sender.runOnce();
+        // Now attempt to fetch the result for the record.
+        try {
+            // This should fail since we aborted the transaction.
+            metadata1.get();

Review comment:
       We have a helper for this pattern. See `TestUtils.assertFutureThrows`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r500166517



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2358,6 +2359,41 @@ public void testForceShutdownWithIncompleteTransaction() {
         }
     }
 
+    @Test
+    public void testTransactionAbortedExceptionOnAbortWithoutError() throws InterruptedException, ExecutionException {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        TransactionManager txnManager = new TransactionManager(logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions, false);
+
+        setupWithTransactionState(txnManager, false, null);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+        // Begin the transaction
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartitionToTransaction(tp0);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE)));
+        // Run it once so that the partition is added to the transaction.
+        sender.runOnce();
+        // Append a record to the accumulator.
+        FutureRecordMetadata metadata1 = appendToAccumulator(tp0, time.milliseconds(), "key1", "value1");
+        // Now abort the transaction manually.
+        txnManager.beginAbort();
+        // Try to send.
+        // This should abort the existing transaction and
+        // drain all the unsent batches with a TransactionAbortedException.
+        sender.runOnce();
+        // Now attempt to fetch the result for the record.
+        try {
+            // This should fail since we aborted the transaction.
+            metadata1.get();

Review comment:
       @hachikuji - Thank you - I've updated the test 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-700349997


   @nym3r0s Take a look at `SenderTest.testForceShutdownWithIncompleteTransaction`. I think we could add a similar test which aborts the transaction directly before the Produce request has been sent. Let us know if you don't have time within the next couple weeks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-699081923


   @ableegoldman I agree it would be useful to have a test case. If you have time, maybe you can help out in a follow-up?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9280:
URL: https://github.com/apache/kafka/pull/9280


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704543777


   @ableegoldman @hachikuji - Should I rebase these changes off of trunk (assuming these tests are fixed on trunk) ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s edited a comment on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s edited a comment on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-699364431


   > 
   > 
   > @ableegoldman I agree it would be useful to have a test case. If you have time, maybe you can help out in a follow-up?
   
   @hachikuji @ableegoldman - I'm so sorry about the delay. I'm still getting up to speed with the codebase, so any tips would be great. Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

Posted by GitBox <gi...@apache.org>.
nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
         if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
             if (accumulator.hasIncomplete()) {
+                // Attempt to get the last error that caused this abort.
                 RuntimeException exception = transactionManager.lastError();
+                // If there was no error, but we are still aborting,
+                // then this is most likely a case where there was no fatal error.
                 if (exception == null) {
-                    exception = new KafkaException("Failing batch since transaction was aborted");
+                    exception = new TransactionAbortedException();
                 }
+                // Since the transaction is aborted / being aborted, abort all the undrained batches.

Review comment:
       Aah. Sorry - I'll take that away. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org