You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/13 17:20:57 UTC

[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

littlehorse-eng commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r969894336


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   @jolshan would that mean that each record sent by the producer would have to include the id of a specific transaction (not just the transactional id of the _producer_?)
   
   If the transactional producer sends a transaction id to the coordinator with each record rather than just the producer's id (in which case the coordinator determines whether there is a transaction going on by the order of Start Transaction, Send Record, End Transaction), then this could work. Otherwise, I don't think it's possible to mitigate on the server side.
   
   @showuon I believe that I've seen this bug cause violation of EOS with a transactional producer in the case of a broker failure (90% sure). I'd much rather deal with the producer crashing than deal with incorrect behavior. However, if it's possible to fix this issue without causing a producer crash that would be really nice (:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org