You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/12 17:13:50 UTC

[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r968682161


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   I'm wondering, is there a way that we could mitigate this server side? Is it possible to prevent writing the late records after the abort marker? I might be missing something though, so let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org