You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/13 20:36:42 UTC

[kafka] branch trunk updated: MINOR: Throw ProducerFencedException directly but with a new instance (#6717)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 63e4f67  MINOR: Throw ProducerFencedException directly but with a new instance (#6717)
63e4f67 is described below

commit 63e4f67d9ba9e08bdce705b35c5acf32dcd20633
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon May 13 13:36:26 2019 -0700

    MINOR: Throw ProducerFencedException directly but with a new instance (#6717)
    
    Currently in maybeFailWithError, we always wrap the lastError as a KafkaException. For ProducerFencedException however, we should just throw the exception itself; however we throw a new instance since the previous book-kept one's call trace is not from this call, and hence could be confusing.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@conflent.io>
---
 .../producer/internals/TransactionManager.java      | 13 +++++++++++--
 .../producer/internals/TransactionManagerTest.java  | 21 ++++++++++++++++++---
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index e24d69b..9ed0dde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
@@ -835,8 +836,16 @@ public class TransactionManager {
     }
 
     private void maybeFailWithError() {
-        if (hasError())
-            throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
+        if (hasError()) {
+            // for ProducerFencedException, do not wrap it as a KafkaException
+            // but create a new instance without the call trace since it was not thrown because of the current call
+            if (lastError instanceof ProducerFencedException) {
+                throw new ProducerFencedException("The producer has been rejected from the broker because " +
+                    "it tried to use an old epoch with the transactionalId");
+            } else {
+                throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
+            }
+        }
     }
 
     private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index ca03b76..eceb8df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -85,6 +86,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 public class TransactionManagerTest {
@@ -1356,8 +1358,8 @@ public class TransactionManagerTest {
         assertTrue(secondResponseFuture.isDone());
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testProducerFencedException() throws InterruptedException, ExecutionException {
+    @Test
+    public void testProducerFencedException() throws InterruptedException {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -1378,7 +1380,20 @@ public class TransactionManagerTest {
 
         assertTrue(responseFuture.isDone());
         assertTrue(transactionManager.hasError());
-        responseFuture.get();
+
+        try {
+            // make sure the produce was expired.
+            responseFuture.get();
+            fail("Expected to get a ExecutionException from the response");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof ProducerFencedException);
+        }
+
+        // make sure the exception was thrown directly from the follow-up calls.
+        assertThrows(ProducerFencedException.class, () -> transactionManager.beginTransaction());
+        assertThrows(ProducerFencedException.class, () -> transactionManager.beginCommit());
+        assertThrows(ProducerFencedException.class, () -> transactionManager.beginAbort());
+        assertThrows(ProducerFencedException.class, () -> transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), "dummyId"));
     }
 
     @Test