You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/25 03:57:01 UTC

[GitHub] [kafka] abbccdda opened a new pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

abbccdda opened a new pull request #8549:
URL: https://github.com/apache/kafka/pull/8549


   Add a separate error code as PRODUCER_FENCED to differentiate INVALID_PRODUCER_EPOCH.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r468900386



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int,
           s"${txnIdAndPidEpoch.transactionalId} due to timeout")
 
       case error@(Errors.INVALID_PRODUCER_ID_MAPPING |
-                  Errors.INVALID_PRODUCER_EPOCH |

Review comment:
       My confusion was that for new broker <-> new client communication, could we still return `INVALID_PRODUCER_EPOCH` as the error code? From KIP-588 my understanding is that, txn-coordinator -> producer would not return `INVALID_PRODUCER_EPOCH` anymore for all txn-related requests, and only broker -> producer ProduceResponse would return `INVALID_PRODUCER_EPOCH`. Now I get that you are maintaining the javadoc / handling for compatibility. So we are good :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-633338325


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672476817


   @guozhangwang Seems we have an import control:
   ```
   [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:36:1: Disallowed import - org.apache.kafka.clients.producer.internals.InvalidProducerEpochException. [ImportControl]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r468693036



##########
File path: clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       sg




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r468327920



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1366,6 +1366,10 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
+            } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
+                // We could still receive INVALID_PRODUCER_EPOCH from transaction coordinator,

Review comment:
       Good catch.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1417,8 +1421,10 @@ public void handleResponse(AbstractResponse response) {
                 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     reenqueue();
                     return;
-                } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                    fatalError(error.exception());
+                } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {

Review comment:
       nit: ".. old versioned transaction coordinator", ditto below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1571,6 +1637,54 @@ public void testProducerFencedException() throws InterruptedException {
             Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
     }
 
+    @Test
+    public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+
+        // make sure the exception was thrown directly from the follow-up calls.
+        assertThrows(KafkaException.class, () -> transactionManager.beginTransaction());
+        assertThrows(KafkaException.class, () -> transactionManager.beginCommit());
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());
+        assertThrows(KafkaException.class, () -> transactionManager.sendOffsetsToTransaction(
+            Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
+    }
+
+    @Test
+    public void testInvalidProducerEpochFromProduce() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, epoch);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+
+        sender.runOnce();
+
+        runUntil(responseFuture::isDone);
+        assertFalse(transactionManager.hasError());

Review comment:
       Thanks for adding the coverage!

##########
File path: clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       Why not put this internal exception in `org.apache.kafka.clients.producer.internals`?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int,
           s"${txnIdAndPidEpoch.transactionalId} due to timeout")
 
       case error@(Errors.INVALID_PRODUCER_ID_MAPPING |
-                  Errors.INVALID_PRODUCER_EPOCH |

Review comment:
       In the new broker version, when do we still return `INVALID_PRODUCER_EPOCH` then? Or would we never return it any more?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r416057712



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -297,46 +298,271 @@ class KafkaApisTest {
     val topic = "topic"
     setupBasicMetadataCache(topic, numPartitions = 2)
 
-    EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion) {

Review comment:
       Side improvement for `shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-648958817


   retest this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-638285212


   @guozhangwang Actually my previous response was not correct. By making the `InvalidProducerEpoch` to extend a `RetriableException`, we could bypass the `sender#canRetry` check to make it non-fatal.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672044991


   2/3 with one flaky test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672584897


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672958592


   All 3 pass, with minor exception due to ssh channel


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672326346






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #8549:
URL: https://github.com/apache/kafka/pull/8549


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r430137301



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the produce request sent to the partition leader
+ * contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
+ * will be aborted and can be retried.
+ */
+public class InvalidProducerEpochException extends RetriableException {

Review comment:
       Sg, let me have a try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672585991


   Started system tests:
   - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4120/
   - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4119/
   - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4118/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672481673


   > @guozhangwang Seems we have an import control:
   
   @ableegoldman I see.. then we'd have to move it back.
   
   I do not have further comments anyways, after green builds and system test (I think the only one we really care is the upgrade tests) passes please feel free to merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672307404


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-646932428


   > Resilience Improvement With Old Brokers
   When the client is on the latest version but the broker is old, the client shall still see INVALID_PRODUCER_EPOCH from produce response and transactional responses. In either case, we shall abort the current transaction and retry with transaction coordinator. If the transaction coordinator doesn't support InitProducerId with epoch, we should enter fatal state just as KIP-360.
   @abbccdda 
   This pr's treatment of INVALID_PRODUCER_EPOCH looks inconsistent with the description of kip-588 as quoted above.
   In this pr, INVALID_PRODUCER_EPOCH will be converted to PRODUCER_FENCED. It make senses as INVALID_PRODUCER_EPOCH from old broker should be treated as fatal error, the same as PRODUCER_FENCED.
   I'm not sure if the resilience improvement with old brokers is still needed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-647041336


   @zhaohaidao Thanks for taking a look. I think the KIP description is wrong about this scenario, as for a transaction coordinator RPC, it will:
   1. return INVALID_PRODUCER_EPOCH as producer fenced exception when old
   2. return PRODUCER_FENCED as producer fenced exception when new
   
   So in either case, we are assumed to be fenced in the perspective of transaction coordinator. The right handling would be to kill the Producer and re-initialize.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao edited a comment on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
zhaohaidao edited a comment on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-646932428


   > Resilience Improvement With Old Brokers
   When the client is on the latest version but the broker is old, the client shall still see INVALID_PRODUCER_EPOCH from produce response and transactional responses. In either case, we shall abort the current transaction and retry with transaction coordinator. If the transaction coordinator doesn't support InitProducerId with epoch, we should enter fatal state just as KIP-360.
   
   @abbccdda 
   This pr's treatment of INVALID_PRODUCER_EPOCH looks inconsistent with the description of kip-588 as quoted above.
   In this pr, INVALID_PRODUCER_EPOCH will be converted to PRODUCER_FENCED. It make senses as INVALID_PRODUCER_EPOCH from old broker should be treated as fatal error, the same as PRODUCER_FENCED.
   I'm not sure if the resilience improvement with old brokers is still needed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r468694089



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int,
           s"${txnIdAndPidEpoch.transactionalId} due to timeout")
 
       case error@(Errors.INVALID_PRODUCER_ID_MAPPING |
-                  Errors.INVALID_PRODUCER_EPOCH |

Review comment:
       We do return `INVALID_PRODUCER_EPOCH` when the client is in old version, as they don't recognize the new error code. See `KafkaApis`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-633787779


   Only failed with known flaky test:
   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-672560761


   @guozhangwang Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r430034971



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1415,8 +1419,10 @@ public void handleResponse(AbstractResponse response) {
                 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     reenqueue();
                     return;
-                } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                    fatalError(error.exception());
+                } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
+                    // For older versions, we could still receive INVALID_PRODUCER_EPOCH from transaction coordinator.

Review comment:
       nit: `For older versions..` -> `we could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator...`

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1364,6 +1364,10 @@ public void handleResponse(AbstractResponse response) {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
                     error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
+            } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
+                // For older versions, we could still receive INVALID_PRODUCER_EPOCH from transaction coordinator.

Review comment:
       nit: ditto here, `We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator.`

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1630,8 +1638,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
                 abortableErrorIfPossible(error.exception());
-            } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                fatalError(error.exception());
+            } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
+                // For older versions, we could still receive INVALID_PRODUCER_EPOCH from transaction coordinator.

Review comment:
       Ditto here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1573,8 +1579,10 @@ public void handleResponse(AbstractResponse response) {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-                fatalError(error.exception());
+            } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
+                // For older versions, we could still receive INVALID_PRODUCER_EPOCH from transaction coordinator.

Review comment:
       Ditto here.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1571,6 +1637,54 @@ public void testProducerFencedException() throws InterruptedException {
             Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
     }
 
+    @Test
+    public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+
+        // make sure the exception was thrown directly from the follow-up calls.
+        assertThrows(KafkaException.class, () -> transactionManager.beginTransaction());
+        assertThrows(KafkaException.class, () -> transactionManager.beginCommit());
+        assertThrows(KafkaException.class, () -> transactionManager.beginAbort());
+        assertThrows(KafkaException.class, () -> transactionManager.sendOffsetsToTransaction(
+            Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
+    }
+
+    @Test
+    public void testInvalidProducerEpochFromProduce() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, epoch);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+
+        sender.runOnce();
+
+        runUntil(responseFuture::isDone);
+        assertFalse(transactionManager.hasError());

Review comment:
       Could we also verify that the producer did retry by sending initPid to the txn coordinator?

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the produce request sent to the partition leader
+ * contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
+ * will be aborted and can be retried.
+ */
+public class InvalidProducerEpochException extends RetriableException {

Review comment:
       If my understanding is correct this exception would never be thrown out to the caller? If that's the case do we need to put it as a public class, or could we put it as part of o.a.k.clients.internals?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org