You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/11 05:23:33 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

guozhangwang commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r468327920



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1366,6 +1366,10 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
+            } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
+                // We could still receive INVALID_PRODUCER_EPOCH from transaction coordinator,

Review comment:
       Good catch.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1417,8 +1421,10 @@ public void handleResponse(AbstractResponse response) {
                 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     reenqueue();
                     return;
-                } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                    fatalError(error.exception());
+                } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {

Review comment:
       nit: ".. old versioned transaction coordinator", ditto below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1571,6 +1637,54 @@ public void testProducerFencedException() throws InterruptedException {
             Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
     }
 
+    @Test
+    public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+
+        // make sure the exception was thrown directly from the follow-up calls.
+        assertThrows(KafkaException.class, () -> transactionManager.beginTransaction());
+        assertThrows(KafkaException.class, () -> transactionManager.beginCommit());
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());
+        assertThrows(KafkaException.class, () -> transactionManager.sendOffsetsToTransaction(
+            Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
+    }
+
+    @Test
+    public void testInvalidProducerEpochFromProduce() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, epoch);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+
+        sender.runOnce();
+
+        runUntil(responseFuture::isDone);
+        assertFalse(transactionManager.hasError());

Review comment:
       Thanks for adding the coverage!

##########
File path: clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       Why not put this internal exception in `org.apache.kafka.clients.producer.internals`?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int,
           s"${txnIdAndPidEpoch.transactionalId} due to timeout")
 
       case error@(Errors.INVALID_PRODUCER_ID_MAPPING |
-                  Errors.INVALID_PRODUCER_EPOCH |

Review comment:
       In the new broker version, when do we still return `INVALID_PRODUCER_EPOCH` then? Or would we never return it any more?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org