You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/22 19:31:54 UTC
[kafka] branch 2.7 updated: Handle ProducerFencedException on
offset commit (#9479)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 36c3b2f Handle ProducerFencedException on offset commit (#9479)
36c3b2f is described below
commit 36c3b2fbfefe256dbea436807827fed54ba4d52a
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Thu Oct 22 21:23:01 2020 +0200
Handle ProducerFencedException on offset commit (#9479)
The transaction manager does currently not handle producer fenced errors returned from a offset commit request.
Adds the handling of the producer fenced errors.
Reviewers: Boyang Chen <bo...@apache.org>, John Roesler <vv...@apache.org>
---
.../producer/internals/TransactionManager.java | 1 +
.../producer/internals/TransactionManagerTest.java | 50 ++++++++++++++++++++++
2 files changed, 51 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index afe4779..ca53dba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1742,6 +1742,7 @@ public class TransactionManager {
private boolean isFatalException(Errors error) {
return error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
|| error == Errors.INVALID_PRODUCER_EPOCH
+ || error == Errors.PRODUCER_FENCED
|| error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT;
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index d8fe3ed..53fb273 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -97,6 +97,9 @@ import java.util.function.Supplier;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -2060,6 +2063,53 @@ public class TransactionManagerTest {
}
@Test
+ public void testHandlingOfProducerFencedErrorOnTxnOffsetCommit() {
+ testFatalErrorInTxnOffsetCommit(Errors.PRODUCER_FENCED);
+ }
+
+ @Test
+ public void testHandlingOfTransactionalIdAuthorizationFailedErrorOnTxnOffsetCommit() {
+ testFatalErrorInTxnOffsetCommit(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
+ }
+
+ @Test
+ public void testHandlingOfInvalidProducerEpochErrorOnTxnOffsetCommit() {
+ testFatalErrorInTxnOffsetCommit(Errors.INVALID_PRODUCER_EPOCH);
+ }
+
+ @Test
+ public void testHandlingOfUnsupportedForMessageFormatErrorOnTxnOffsetCommit() {
+ testFatalErrorInTxnOffsetCommit(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
+ }
+
+ private void testFatalErrorInTxnOffsetCommit(final Errors error) {
+ doInitTransactions();
+
+ transactionManager.beginTransaction();
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(1));
+ offsets.put(tp1, new OffsetAndMetadata(1));
+
+ TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(
+ offsets, new ConsumerGroupMetadata(consumerGroupId));
+ prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
+ runUntil(() -> !client.hasPendingResponses());
+ assertThat(addOffsetsResult.isCompleted(), is(false)); // The request should complete only after the TxnOffsetCommit completes.
+
+ Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
+ txnOffsetCommitResponse.put(tp0, Errors.NONE);
+ txnOffsetCommitResponse.put(tp1, error);
+
+ prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
+ prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse);
+
+ runUntil(addOffsetsResult::isCompleted);
+ assertThat(addOffsetsResult.isSuccessful(), is(false));
+ assertThat(addOffsetsResult.error(), instanceOf(error.exception().getClass()));
+ }
+
+ @Test
public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
doInitTransactions();