You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/08 10:53:19 UTC

[GitHub] [kafka] urbandan opened a new pull request, #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

urbandan opened a new pull request, #12392:
URL: https://github.com/apache/kafka/pull/12392

   …orting when a delivery timeout is encountered
   
   When a transactional batch encounters delivery timeout, it can still be in-flight. In this situation, if the transaction is aborted, the abort marker might get appended to the log earlier than the in-flight batch. This can cause the LSO of a partition to be blocked infinitely, or can violate the processing guarantees.
   To avoid this situation, on a delivery timeout, the transactional producer should skip aborting (EndTxnRequest), and bump the epoch instead. When the expected PRODUCER_FENCED error is received, the producer increases the epoch by 1, and retries the InitProducerID. If the 2nd init succeeds, the producer can continue with the increased epoch. Otherwise, the producer was fenced.
   
   To support this new kind of bump usage, the producer needs to make sure that the producer epoch is not close to exhaustion. The first bump, which will abort the ongoing transaction, always ends with a PRODUCER_FENCED error, thus the producer must bump a second time. If the epoch is exhausted, the producer cannot increase the epoch any further, since the coordinator is required to generate a new producer id for the transactional id.
   To avoid this situation, when the producer bumps the epoch without any error, it will check whether the epoch is close to exhaustion, and if it is, will try to bump the epoch enough times to force a newly generated producer id, with the epoch reset. This ensures that when the next delivery timeout occurs, the producer will be able to bump one time to continue work.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1296391670

   I'm not sure if the fix addresses the following scenario:
   - producer got a timeout (and there is a delayed produce that got stuck)
   - producer crashes before having an opportunity to bump epoch or anything
   - transaction coordinator auto-aborts transaction
   - delayed produce gets unstuck and delivered on top of abort


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r976275357


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   @jolshan sorry for the confusion, I understood that the uniqueness would be achieved through the epoch bumps - I just don't really see the added value of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r939928412


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);
+        } else {
+            log.info("Cannot bump epoch, transitioning into fatal error");
+            transitionToFatalError(failure);

Review Comment:
   hi @showuon, do you think this explanation and solution makes sense? or should I look into the other solution, in which the producer stays in a usable state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1320546110

   @urbandan By the way, KIP-890 is now available to review 😄 
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1190135063

   @showuon @artemlivshits Can you please take a look at this PR? This is the issue we had a thread about on the dev list.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
littlehorse-eng commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r969894336


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   @jolshan would that mean that each record sent by the producer would have to include the id of a specific transaction (not just the transactional id of the _producer_?)
   
   If the transactional producer sends a transaction id to the coordinator with each record rather than just the producer's id (in which case the coordinator determines whether there is a transaction going on by the order of Start Transaction, Send Record, End Transaction), then this could work. Otherwise, I don't think it's possible to mitigate on the server side.
   
   @showuon I believe that I've seen this bug cause violation of EOS with a transactional producer in the case of a broker failure (90% sure). I'd much rather deal with the producer crashing than deal with incorrect behavior. However, if it's possible to fix this issue without causing a producer crash that would be really nice (:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r973165279


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   I'm not suggesting a unique transactional ID, but simply bumping the epoch would give us a unique identifier for the transaction in combination with the producer and/or transaction ID. Again -- this is something I'm considering as a longer term change, and there could be flaws.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r972691688


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   @jolshan not sure about the impact it would have on the overhead of transactions, but having a unique ID per transaction doesn't really seem necessary to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r943219606


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   So, it looks like after this patch, when batch expiration or timeout error, the producer will enter fatal error state after bumping epoch. But before this patch, the we'll abort it and continue the transaction work. Is that right?
   
   Sorry, I didn't realize this situation. This will impact current user behavior, so we need more discussion. I'll ping some experts in this PR, and hope they will help provide comments. 
   cc @artemlivshits @ijuma 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] pnagy-cldr commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
pnagy-cldr commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r923345335


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -168,6 +171,8 @@ private boolean isTransitionValid(State source, State target) {
                     return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
                     return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
+                case FATAL_BUMPABLE_ERROR:
+                    return source != FATAL_ERROR;

Review Comment:
   It might make sense to have more fine tuned restriction when we can get into FATAL_BUMPABLE_ERROR. For example we might not want to get there from UNINITIALIZED



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r923486565


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -168,6 +171,8 @@ private boolean isTransitionValid(State source, State target) {
                     return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
                     return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
+                case FATAL_BUMPABLE_ERROR:
+                    return source != FATAL_ERROR;

Review Comment:
   Yes, good catch, cleared that one up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1197620235

   I'll review it this week. Sorry for the delay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r970386896


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   @ijuma I think it would be possible to avoid a fatal state, but it would require a client-side epoch bump.
   When an IniPid is sent during an ongoing transaction, the coordinator bumps the producer epoch to fence off the current producer. This bumped epoch is never returned to any producers as a valid epoch. This never-exposed epoch could be used by the producer to stay in a usable state.
   
   In short:
   epoch=0 -> delivery timeout occurs -> send fencing InitPid -> epoch=1 (on coordinator side) -> increase epoch on client side -> send another InitPid -> safely acquire epoch=2
   
   Since epoch=1 will never be used by another producer, this is a safe operation, and an actual fencing operation (by another producer instance) can be detected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1295396711

   Thanks for all the discussion here and sorry for the late arrival. I have seen this issue in practice as well, often in the context of hanging transactions. The late-arriving `Produce` request is not expected by the transaction coordinator. Unless the producer is lingering around to continue writing to the transaction, then it is considered hanging by the partition leader. It's also fair to point out that this can violate the transaction's atomicity.
   
   I think the basic idea in the patch here is to bump the epoch when we abort a transaction in order to fence off writes that are in inflight. Do I have that right?
   
   This is in the spirit of an idea that's been on my mind for a while. The only difference is that I was considering a server-side implementation. The basic thought is to have the coordinator bump the epoch after _every_ `EndTxn` request. We would let the bumped epoch be returned in the response.
   
   EndTxnResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
   
   The tuple of `(producerId, epoch)` effectively becomes a unique transaction ID. This would also simplify some of the sequence bookkeeping that we've had so much trouble with on the client. Each transaction would begin with sequence=0 on every partition and the client could "forget" about the inflight requests. Some of the logic we have struggled to get right is how to continue the sequence chain 
   
   There is still a hole, however, which I think @jolshan was describing above. We cannot assume clients will always add partitions correctly to the transaction before beginning to write to the partition. We need a server-side validation. Otherwise, hanging transactions will always be possible. We have seen this so many times by now.
   
   My suggestion here is to let us get a KIP out in the couple weeks with a good server-side solution. We may still need a client-side approach for compatibility with older brokers though, so maybe we can leave the PR open. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r947670046


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   Yes, that is correct. That abort is causing the issue. The producer just assumes that the batches failed, but it is possible that they are still in-flight. When that happens, the abort marker might get processed earlier than the batch. I've seen this in action, and it corrupts the affected partition permanently.
   
   If it is better to keep the producer in a usable state, I can give it a shot. I had one experiment in which I tried keeping the producer usable by increasing the epoch on the client side once. I believe that it is safe to do as the fencing bump will increase the epoch, and the coordinator will never return that to any clients.
   
   Please let me know what you think @ijuma @artemlivshits @showuon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1305253651

   @artemlivshits The scenario you mentioned is already covered, even without this change - when a transaction times out, the transaction coordinator bumps the epoch, so it already fences off the "stuck" produce request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1261212204

   @urbandan 
   > If it is better to keep the producer in a usable state, I can give it a shot. I had one experiment in which I tried keeping the producer usable by increasing the epoch on the client side once. I believe that it is safe to do as the fencing bump will increase the epoch, and the coordinator will never return that to any clients
   
    >I think it would be possible to avoid a fatal state, but it would require a client-side epoch bump.
   When an IniPid is sent during an ongoing transaction, the coordinator bumps the producer epoch to fence off the current producer. This bumped epoch is never returned to any producers as a valid epoch. This never-exposed epoch could be used by the producer to stay in a usable state.
   
   >In short:
   epoch=0 -> delivery timeout occurs -> send fencing InitPid -> epoch=1 (on coordinator side) -> increase epoch on client side -> send another InitPid -> safely acquire epoch=2
   Since epoch=1 will never be used by another producer, this is a safe operation, and an actual fencing operation (by another producer instance) can be detected.
   
   Can you elaborate a bit more on this idea? Is this the implementation in the PR now, or was an idea to avoid the fatal error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);
+        } else {
+            log.info("Cannot bump epoch, transitioning into fatal error");
+            transitionToFatalError(failure);

Review Comment:
   Let me make sure I understand your problem and solution. Are you saying the issue happens only when the **"timed out transaction ID" is not re-used**, and the abort marker arrived earlier than transaction records. Is my understanding correct? 
   
   And what we are trying to do is to force bump the epoch when encountering timeout exception, to let the fence mechanism help us abort previous in-flight transactions. And next, we enter `fatal error` state as before. Is that right?
   
   If so, then I have a question: what if the initPid request failed (i.e. failed to bump the epoch), what will happen? The pending transactions will still occur?
   
   Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r934188826


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);
+        } else {
+            log.info("Cannot bump epoch, transitioning into fatal error");
+            transitionToFatalError(failure);

Review Comment:
   Out of order messages can occur even when the transactional.id is reused. The issue I encountered was caused by a valid producer aborting the transaction "too soon" - where too soon means that all of the last batches were timed out due to the delivery.timeout.ms, but they were still in-flight. So the issue occurs with a single producer, without any fencing or transactional.id reuse.
   
   Yes, that summary is right. Bump to fence the in-flight requests, then discard the producer.
   
   If the initPid fails, there can be 2 scenarios:
   1. Transaction times out due to transaction.timeout.ms - in this case, the coordinator bumps the epoch, practically achieving the same fencing I am trying to implement here.
   2. Transactional.id is reused by a new producer instance - in this case, the usual fencing happens.
   So I believe that the essential change here is that the producer must not abort when encountering a client side timeout.
   
   As for the producer going into fatal state - I was thinking about a possible workaround for that, and I think the producer can be kept in a usable state, but it involves the epoch being increased on the client side. If this fatal state solution is not acceptable, I can work on another version of the change which involves this client-side bump. I was hesitant to do so because I wasn't sure if the protocol allows such things, but since the idempotent producer does the same, my guess is that it is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r942413117


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);

Review Comment:
   We enqueue request here, and when will we send out the request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r923486245


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -728,18 +779,11 @@ synchronized void maybeResolveSequences() {
                 } else {
                     // We would enter this branch if all in flight batches were ultimately expired in the producer.
                     if (isTransactional()) {
-                        // For the transactional producer, we bump the epoch if possible, otherwise we transition to a fatal error
+                        // For the transactional producer, we bump the epoch if possible, then transition to a fatal error

Review Comment:
   The existing epochBumpRequired flag is used to bump the epoch after an abort. It is usually used to reset the sequence numbers for the producer, and keeps the producer in a usable state.
   In the case I'm trying to fix, we have to skip the abort, and immediately go to the bump. This means that the producer will bump during a transaction, which is handles as a fence by the coordinator. Because of this, there is no way to safely get a new (bumped) epoch with this specific producer instance, and we need to handle this case as a fatal error.
   After the InitProducerIDRequest was successful, we transition into the old FATAL_ERROR state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1261958861

   @jolshan It is an idea, the first version of the PR was trying to implement that, but the current state of the PR is based on the fatal state.
   
   The idea about keeping the producer in a reusable state is kind of tricky. The issue is that to fix the bug, we need to bump the epoch instead of aborting.
   Normally, an epoch bump results in a successful response from the coordinator, which contains the increased epoch, which then can be safely used by the producer to keep working. But bumping an epoch **during** an ongoing transaction is handled differently, because the coordinator assumes that a producer fencing occurred (a new producer instance with the same transaction id started up). Because of this, the response to the bump does not contain an actual epoch - it kicks off the fencing operation, and tells the producer to keep retrying until the fencing operation is finished. When that is done, the coordinator will increase the epoch again, and return it to the new producer.
   An important observation here is that there is an epoch which **is never returned to any producers** by the coordinator. We could rely on this fact by trying to use this "hidden" epoch, by increasing the epoch on the client side. Then we can try to bump the epoch again with this "hidden" epoch. If there were no other producer instances fencing off the current producer, this will succeed, and we will get an increased epoch from the broker, meaning that the producer can safely continue. If there was another producer instance fencing of the current instance, even this "hidden" epoch will be fenced anyway.
   
   In short, as I wrote in the other thread:
   epoch=0 -> delivery timeout occurs -> send fencing InitPid with epoch=0 -> epoch=1 (on coordinator side) -> increase epoch on client side epoch=1 -> send another InitPid with epoch=1 -> safely acquire epoch=2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] akatona84 commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
akatona84 commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r923072806


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -728,18 +779,11 @@ synchronized void maybeResolveSequences() {
                 } else {
                     // We would enter this branch if all in flight batches were ultimately expired in the producer.
                     if (isTransactional()) {
-                        // For the transactional producer, we bump the epoch if possible, otherwise we transition to a fatal error
+                        // For the transactional producer, we bump the epoch if possible, then transition to a fatal error

Review Comment:
   It's a behaviour change here, when the transactional producer reaches this state, we'll do an epoch bump and then it'll be a fatal error.
   Could you explain how it's changed actually? What's the difference between flipping the epochBumpRequired flag and go to abortable, and going to fatalbumpable?
   Was the producer still usable after abortable transition (and the handled abort)? 



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,39 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        RuntimeException failure = cause == null
+                ? new KafkaException("Encountered unrecoverable error due to batch delivery timeout")
+                : new KafkaException("Encountered unrecoverable error due to batch delivery timeout", cause);

Review Comment:
   It can be another timeout exception too (e.g. upon request timeout?)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r968682161


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   I'm wondering, is there a way that we could mitigate this server side? Is it possible to prevent writing the late records after the abort marker? I might be missing something though, so let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r971022152


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   I was thinking for some longer term work we could potentially distinguish transactions by having perhaps having a bit of extra state server-side and by bumping the epoch after each transaction. But maybe this is too large of a change for now.
   
   I think you also came to the conclusion of an epoch bump but through a different path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);
+        } else {
+            log.info("Cannot bump epoch, transitioning into fatal error");
+            transitionToFatalError(failure);

Review Comment:
   Let me make sure I understand your problem and solution. Are you saying the issue happens only when the **"timed out transaction ID" is not re-used**, and the abort marker arrived earlier than transaction records. Is my understanding correct? 
   
   And what we are trying to do is to force bump the epoch when encountering timeout exception, to let the fence mechanism help us abort previous in-flight transactions. And next, we enter `fatal error` state as before. Is that right?
   
   So, question: what if the initPid request failed (i.e. failed to bump the epoch), what will happen? The pending transactions will still occur?
   
   Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r923482851


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,39 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        RuntimeException failure = cause == null
+                ? new KafkaException("Encountered unrecoverable error due to batch delivery timeout")
+                : new KafkaException("Encountered unrecoverable error due to batch delivery timeout", cause);

Review Comment:
   Yes, thank you for catching that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1259383078

   @dajac @hachikuji Any chance you can take a look at this? This is a painful issue in transactional producers, with some serious consequences (partition corruption).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r942423899


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);

Review Comment:
   At this point we enter the FATAL_BUMPABLE_ERROR, which still allows the Sender to send requests - see the changes in the Sender class and in TransactionManager#maybeTerminateRequestWithError.
   If the producer is closed gracefully, we will try to send this last InitProducerID request. After the InitProducerID was successful, we transition into FATAL_ERROR, and won't send anything else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] viktorsomogyi commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
viktorsomogyi commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1190132574

   @hachikuji would you please review this PR as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r947888637


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   I personally like the solution to make the producer entering fatal error state. But I'd like to hear others' opinion since it will affect producer's behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r976703508


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   Yeah. No worries. I think what I was thinking of would require a bit more effort -- but the idea is that if the server knew the difference between individual transactions, then it could make better decisions about new writes and markers. (Ie, potentially we could avoid appending the records of an old transaction after a marker for that transaction is appended.) But I also think this idea needs a bit more thought and could require more work than what you are proposing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1229897726

   @artemlivshits @ijuma @hachikuji Can you please take a look at this PR? Trying to fix a bug in the transactional producer. Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r969939389


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   Hmm, entering fatal state doesn't seem desirable. @hachikuji @dajac Do you have thoughts on how to avoid that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r973165279


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());

Review Comment:
   I'm not suggesting a unique transactional ID, but simply bumping the epoch would give us a unique identifier for the transaction in combination with the transaction ID. Again -- this is something I'm considering as a longer term change, and there could be flaws.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1295780702

   @hachikuji Thanks for the feedback.
   Yes, the essence of the change is bumping the epoch - but only in case of timeouts - so delivery timeout or request timeout.
   
   Overall I agree that a server-side solution might be safer, and I'm interested in the KIP.
   At the same time, it sounds like a pretty big overhaul of the transaction coordination flow, and as you mentioned, will only help in new broker versions.
   What I was aiming to achieve with this change was to provide a bugfix which could be even backported to older versions. Do you think it would make sense to move forward with this fix in the meantime?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org