You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/12/06 19:41:39 UTC

[kafka] branch trunk updated: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5aad085a8e7 KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915)
5aad085a8e7 is described below

commit 5aad085a8e7514c14a17121d316a2e2b2add8bcc
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Tue Dec 6 11:41:31 2022 -0800

    KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915)
    
    The broker may return the `REQUEST_TIMED_OUT` error in `InitProducerId` responses when allocating the ID using the `AllocateProducerIds` request. The client currently does not handle this. Instead of retrying as we would expect, the client raises a fatal exception to the application.
    
    In this patch, we address this problem by modifying the producer to handle `REQUEST_TIMED_OUT` and any other retriable errors by re-enqueuing the request.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../producer/internals/TransactionManager.java     | 10 ++--
 .../errors/ConcurrentTransactionsException.java    |  2 +-
 .../producer/internals/TransactionManagerTest.java | 57 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5aab62eaf22..de5a6ced41c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1290,7 +1290,7 @@ public class TransactionManager {
             } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error.exception() instanceof RetriableException) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
@@ -1347,7 +1347,7 @@ public class TransactionManager {
                     maybeOverrideRetryBackoffMs();
                     reenqueue();
                     return;
-                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                } else if (error.exception() instanceof RetriableException) {
                     reenqueue();
                     return;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
@@ -1467,7 +1467,7 @@ public class TransactionManager {
                 }
                 result.done();
                 log.info("Discovered {} coordinator {}", coordinatorType.toString().toLowerCase(Locale.ROOT), node);
-            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+            } else if (error.exception() instanceof RetriableException) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
@@ -1515,7 +1515,7 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error.exception() instanceof RetriableException) {
                 reenqueue();
             } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
                 // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
@@ -1572,7 +1572,7 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();
-            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+            } else if (error.exception() instanceof RetriableException) {
                 reenqueue();
             } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
                 abortableErrorIfPossible(error.exception());
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
index 6ad6b8a3ddb..118b4de50aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.errors;
 
-public class ConcurrentTransactionsException extends ApiException {
+public class ConcurrentTransactionsException extends RetriableException {
     private static final long serialVersionUID = 1L;
 
     public ConcurrentTransactionsException(final String message) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index b6bf9e6f4f1..ce9b8052207 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -77,6 +77,8 @@ import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -1678,6 +1680,61 @@ public class TransactionManagerTest {
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @EnumSource(names = {
+            "UNKNOWN_TOPIC_OR_PARTITION",
+            "REQUEST_TIMED_OUT",
+            "COORDINATOR_LOAD_IN_PROGRESS",
+            "CONCURRENT_TRANSACTIONS"
+    })
+    public void testRetriableErrors2(Errors error) {
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+        Errors addPartitionsToTxnError = error.equals(Errors.CONCURRENT_TRANSACTIONS) ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {
+        // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
+        TransactionalRequestResult result = transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+    }
+
     @Test
     public void testProducerFencedExceptionInInitProducerId() {
         verifyProducerFencedForInitProducerId(Errors.PRODUCER_FENCED);