You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/01 06:47:22 UTC

[GitHub] [kafka] urbandan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r934188826


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) {
+        if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);
+        } else {
+            log.info("Cannot bump epoch, transitioning into fatal error");
+            transitionToFatalError(failure);

Review Comment:
   Out of order messages can occur even when the transactional.id is reused. The issue I encountered was caused by a valid producer aborting the transaction "too soon" - where too soon means that all of the last batches were timed out due to the delivery.timeout.ms, but they were still in-flight. So the issue occurs with a single producer, without any fencing or transactional.id reuse.
   
   Yes, that summary is right. Bump to fence the in-flight requests, then discard the producer.
   
   If the initPid fails, there can be 2 scenarios:
   1. Transaction times out due to transaction.timeout.ms - in this case, the coordinator bumps the epoch, practically achieving the same fencing I am trying to implement here.
   2. Transactional.id is reused by a new producer instance - in this case, the usual fencing happens.
   So I believe that the essential change here is that the producer must not abort when encountering a client side timeout.
   
   As for the producer going into fatal state - I was thinking about a possible workaround for that, and I think the producer can be kept in a usable state, but it involves the epoch being increased on the client side. If this fatal state solution is not acceptable, I can work on another version of the change which involves this client-side bump. I was hesitant to do so because I wasn't sure if the protocol allows such things, but since the idempotent producer does the same, my guess is that it is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org