You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/13 20:36:42 UTC
[kafka] branch trunk updated: MINOR: Throw ProducerFencedException
directly but with a new instance (#6717)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 63e4f67 MINOR: Throw ProducerFencedException directly but with a new instance (#6717)
63e4f67 is described below
commit 63e4f67d9ba9e08bdce705b35c5acf32dcd20633
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon May 13 13:36:26 2019 -0700
MINOR: Throw ProducerFencedException directly but with a new instance (#6717)
Currently in maybeFailWithError, we always wrap the lastError as a KafkaException. For ProducerFencedException however, we should just throw the exception itself; however we throw a new instance since the previous book-kept one's call trace is not from this call, and hence could be confusing.
Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@conflent.io>
---
.../producer/internals/TransactionManager.java | 13 +++++++++++--
.../producer/internals/TransactionManagerTest.java | 21 ++++++++++++++++++---
2 files changed, 29 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index e24d69b..9ed0dde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
@@ -835,8 +836,16 @@ public class TransactionManager {
}
private void maybeFailWithError() {
- if (hasError())
- throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
+ if (hasError()) {
+ // for ProducerFencedException, do not wrap it as a KafkaException
+ // but create a new instance without the call trace since it was not thrown because of the current call
+ if (lastError instanceof ProducerFencedException) {
+ throw new ProducerFencedException("The producer has been rejected from the broker because " +
+ "it tried to use an old epoch with the transactionalId");
+ } else {
+ throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
+ }
+ }
}
private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index ca03b76..eceb8df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -85,6 +86,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
public class TransactionManagerTest {
@@ -1356,8 +1358,8 @@ public class TransactionManagerTest {
assertTrue(secondResponseFuture.isDone());
}
- @Test(expected = ExecutionException.class)
- public void testProducerFencedException() throws InterruptedException, ExecutionException {
+ @Test
+ public void testProducerFencedException() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
@@ -1378,7 +1380,20 @@ public class TransactionManagerTest {
assertTrue(responseFuture.isDone());
assertTrue(transactionManager.hasError());
- responseFuture.get();
+
+ try {
+ // make sure the produce was expired.
+ responseFuture.get();
+ fail("Expected to get a ExecutionException from the response");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof ProducerFencedException);
+ }
+
+ // make sure the exception was thrown directly from the follow-up calls.
+ assertThrows(ProducerFencedException.class, () -> transactionManager.beginTransaction());
+ assertThrows(ProducerFencedException.class, () -> transactionManager.beginCommit());
+ assertThrows(ProducerFencedException.class, () -> transactionManager.beginAbort());
+ assertThrows(ProducerFencedException.class, () -> transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), "dummyId"));
}
@Test