You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/12/06 19:41:39 UTC
[kafka] branch trunk updated: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5aad085a8e7 KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915)
5aad085a8e7 is described below
commit 5aad085a8e7514c14a17121d316a2e2b2add8bcc
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Tue Dec 6 11:41:31 2022 -0800
KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915)
The broker may return the `REQUEST_TIMED_OUT` error in `InitProducerId` responses when allocating the ID using the `AllocateProducerIds` request. The client currently does not handle this. Instead of retrying as we would expect, the client raises a fatal exception to the application.
In this patch, we address this problem by modifying the producer to handle `REQUEST_TIMED_OUT` and any other retriable errors by re-enqueuing the request.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../producer/internals/TransactionManager.java | 10 ++--
.../errors/ConcurrentTransactionsException.java | 2 +-
.../producer/internals/TransactionManagerTest.java | 57 ++++++++++++++++++++++
3 files changed, 63 insertions(+), 6 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5aab62eaf22..de5a6ced41c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1290,7 +1290,7 @@ public class TransactionManager {
} else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
reenqueue();
- } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+ } else if (error.exception() instanceof RetriableException) {
reenqueue();
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
@@ -1347,7 +1347,7 @@ public class TransactionManager {
maybeOverrideRetryBackoffMs();
reenqueue();
return;
- } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+ } else if (error.exception() instanceof RetriableException) {
reenqueue();
return;
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
@@ -1467,7 +1467,7 @@ public class TransactionManager {
}
result.done();
log.info("Discovered {} coordinator {}", coordinatorType.toString().toLowerCase(Locale.ROOT), node);
- } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+ } else if (error.exception() instanceof RetriableException) {
reenqueue();
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
@@ -1515,7 +1515,7 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
reenqueue();
- } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+ } else if (error.exception() instanceof RetriableException) {
reenqueue();
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
@@ -1572,7 +1572,7 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
reenqueue();
- } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
+ } else if (error.exception() instanceof RetriableException) {
reenqueue();
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
abortableErrorIfPossible(error.exception());
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
index 6ad6b8a3ddb..118b4de50aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.errors;
-public class ConcurrentTransactionsException extends ApiException {
+public class ConcurrentTransactionsException extends RetriableException {
private static final long serialVersionUID = 1L;
public ConcurrentTransactionsException(final String message) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index b6bf9e6f4f1..ce9b8052207 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -77,6 +77,8 @@ import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -1678,6 +1680,61 @@ public class TransactionManagerTest {
assertTrue(secondResponseFuture.isDone());
}
+ @ParameterizedTest
+ @EnumSource(names = {
+ "UNKNOWN_TOPIC_OR_PARTITION",
+ "REQUEST_TIMED_OUT",
+ "COORDINATOR_LOAD_IN_PROGRESS",
+ "CONCURRENT_TRANSACTIONS"
+ })
+ public void testRetriableErrors2(Errors error) {
+ // Ensure FindCoordinator retries.
+ TransactionalRequestResult result = transactionManager.initializeTransactions();
+ prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId);
+ prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+ runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+ assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+ // Ensure InitPid retries.
+ prepareInitPidResponse(error, false, producerId, epoch);
+ prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+ runUntil(transactionManager::hasProducerId);
+
+ result.await();
+ transactionManager.beginTransaction();
+
+ // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute.
+ Errors addPartitionsToTxnError = error.equals(Errors.CONCURRENT_TRANSACTIONS) ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+ transactionManager.maybeAddPartition(tp0);
+ prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId);
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+ runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+ // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit.
+
+ // Ensure EndTxn retries.
+ TransactionalRequestResult abortResult = transactionManager.beginCommit();
+ prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch);
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
+ runUntil(abortResult::isCompleted);
+ assertTrue(abortResult.isSuccessful());
+ }
+
+ @Test
+ public void testCoordinatorNotAvailable() {
+ // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
+ TransactionalRequestResult result = transactionManager.initializeTransactions();
+ prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId);
+ prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+ runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+ assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+ prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+ runUntil(transactionManager::hasProducerId);
+
+ result.await();
+ }
+
@Test
public void testProducerFencedExceptionInInitProducerId() {
verifyProducerFencedForInitProducerId(Errors.PRODUCER_FENCED);