You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/10 12:57:14 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r942413117


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);

Review Comment:
   We enqueue request here, and when will we send out the request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org