You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/22 19:31:54 UTC

[kafka] branch 2.7 updated: Handle ProducerFencedException on offset commit (#9479)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 36c3b2f  Handle ProducerFencedException on offset commit (#9479)
36c3b2f is described below

commit 36c3b2fbfefe256dbea436807827fed54ba4d52a
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Thu Oct 22 21:23:01 2020 +0200

    Handle ProducerFencedException on offset commit (#9479)
    
    The transaction manager does currently not handle producer fenced errors returned from a offset commit request.
    
    Adds the handling of the producer fenced errors.
    
    Reviewers: Boyang Chen <bo...@apache.org>, John Roesler <vv...@apache.org>
---
 .../producer/internals/TransactionManager.java     |  1 +
 .../producer/internals/TransactionManagerTest.java | 50 ++++++++++++++++++++++
 2 files changed, 51 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index afe4779..ca53dba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1742,6 +1742,7 @@ public class TransactionManager {
     private boolean isFatalException(Errors error) {
         return error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
                    || error == Errors.INVALID_PRODUCER_EPOCH
+                   || error == Errors.PRODUCER_FENCED
                    || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT;
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index d8fe3ed..53fb273 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -97,6 +97,9 @@ import java.util.function.Supplier;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -2060,6 +2063,53 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testHandlingOfProducerFencedErrorOnTxnOffsetCommit() {
+        testFatalErrorInTxnOffsetCommit(Errors.PRODUCER_FENCED);
+    }
+
+    @Test
+    public void testHandlingOfTransactionalIdAuthorizationFailedErrorOnTxnOffsetCommit() {
+        testFatalErrorInTxnOffsetCommit(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
+    }
+
+    @Test
+    public void testHandlingOfInvalidProducerEpochErrorOnTxnOffsetCommit() {
+        testFatalErrorInTxnOffsetCommit(Errors.INVALID_PRODUCER_EPOCH);
+    }
+
+    @Test
+    public void testHandlingOfUnsupportedForMessageFormatErrorOnTxnOffsetCommit() {
+        testFatalErrorInTxnOffsetCommit(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
+    }
+
+    private void testFatalErrorInTxnOffsetCommit(final Errors error) {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(1));
+        offsets.put(tp1, new OffsetAndMetadata(1));
+
+        TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(
+            offsets, new ConsumerGroupMetadata(consumerGroupId));
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
+        runUntil(() -> !client.hasPendingResponses());
+        assertThat(addOffsetsResult.isCompleted(), is(false));  // The request should complete only after the TxnOffsetCommit completes.
+
+        Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
+        txnOffsetCommitResponse.put(tp0, Errors.NONE);
+        txnOffsetCommitResponse.put(tp1, error);
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
+        prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse);
+
+        runUntil(addOffsetsResult::isCompleted);
+        assertThat(addOffsetsResult.isSuccessful(), is(false));
+        assertThat(addOffsetsResult.error(), instanceOf(error.exception().getClass()));
+    }
+
+    @Test
     public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
         doInitTransactions();