You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/19 15:00:42 UTC

[GitHub] [kafka] zhaohaidao opened a new pull request #9311: KAFKA-9910: Implement new transaction timed out error

zhaohaidao opened a new pull request #9311:
URL: https://github.com/apache/kafka/pull/9311


   The producer recovers by internally sending InitProducerId with the current epoch when received TRANSACTION_TIMED_OUT error.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502821475



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -580,6 +580,52 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertTrue(transactionalRecords.isEmpty)
   }
 
+  @Test
+  def testProducerUsableAfterTxnTimeOutAbortExcetion(): Unit = {

Review comment:
       Could we use the same template and reproduce the scenario where user doesn't call abortTxn and immediately resume working?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r493695444



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1072,7 +1076,11 @@ private void transitionTo(State target, RuntimeException error) {
             if (error == null)
                 throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");
             lastError = error;
+            abortableError = error;
         } else {
+            if (target != State.ABORTING_TRANSACTION) {

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r504225780



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1198,18 +1209,22 @@ boolean canBumpEpoch() {
         return coordinatorSupportsBumpingEpoch;
     }
 
+    private void resetTransactions() {
+        newPartitionsInTransaction.clear();
+        pendingPartitionsInTransaction.clear();
+        partitionsInTransaction.clear();
+    }
+
     private void completeTransaction() {
         if (epochBumpRequired) {
             transitionTo(State.INITIALIZING);
         } else {
             transitionTo(State.READY);
         }
         lastError = null;
+        abortableError = null;
         epochBumpRequired = false;
-        transactionStarted = false;

Review comment:
       Do we miss the update for this flag?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1438,7 +1455,8 @@ public void handleResponse(AbstractResponse response) {
                     log.debug("Did not attempt to add partition {} to transaction because other partitions in the " +
                             "batch had errors.", topicPartition);
                     hasPartitionErrors = true;
-                } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
+                } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING

Review comment:
       nit: we could get a helper for the combined error equality check

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -671,6 +782,57 @@ class KafkaApisTest {
     }
   }
 
+  @Test
+  def shouldReplaceTxnTimeoutWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = {

Review comment:
       Could we consolidate the tests with `shouldReplaceProducerFencedWithInvalidProducerEpochInEndTxnWithOlderClient`? Similar to other error code replacement tests.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +388,14 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
-              Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch || producerEpoch < txnMetadata.producerEpoch) {

Review comment:
       nit: I'm slightly prefer the previous condition format with () around &&.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -369,7 +372,9 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
         // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server will not accept an EndTxnRequest, so skip
         // directly to InitProducerId. Otherwise, we must first abort the transaction, because the producer will be
         // fenced if we directly call InitProducerId.
-        if (!(lastError instanceof InvalidPidMappingException)) {
+        boolean needEndTxn = !(abortableError instanceof InvalidPidMappingException)

Review comment:
       If this is the case, could we just remove this intermediate variable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao closed pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao closed pull request #9311:
URL: https://github.com/apache/kafka/pull/9311


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r505722145



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +388,14 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
-              Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch || producerEpoch < txnMetadata.producerEpoch) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502917729



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -580,6 +580,52 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertTrue(transactionalRecords.isEmpty)
   }
 
+  @Test
+  def testProducerUsableAfterTxnTimeOutAbortExcetion(): Unit = {
+    val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
+
+    producer.initTransactions()
+    producer.beginTransaction()
+
+    // The first message and hence the first AddPartitions request should be successfully sent.
+    val firstMessageResult = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false)).get()
+    assertTrue(firstMessageResult.hasOffset)
+
+    // Wait for the expiration cycle to kick in.
+    Thread.sleep(600)
+
+    try {
+      // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
+      producer.commitTransaction()
+      fail("should have raised a TransactionTimeOutException since the transaction has expired")
+    } catch {
+      case _: TransactionTimeoutException =>
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[TransactionTimeoutException])
+    }
+
+    val transactionalConsumer = transactionalConsumers.head
+    transactionalConsumer.subscribe(List(topic1).asJava)
+
+    val transactionalRecords = TestUtils.consumeRecordsFor(transactionalConsumer, 1000)
+    assertTrue(transactionalRecords.isEmpty)
+
+    producer.abortTransaction()
+    producer.beginTransaction()
+    // The second message and hence the second AddPartitions request should be successfully sent.

Review comment:
       This comment refers to other cases, which is meaningless. I will remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502820041



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -580,6 +580,52 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertTrue(transactionalRecords.isEmpty)
   }
 
+  @Test
+  def testProducerUsableAfterTxnTimeOutAbortExcetion(): Unit = {
+    val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
+
+    producer.initTransactions()
+    producer.beginTransaction()
+
+    // The first message and hence the first AddPartitions request should be successfully sent.
+    val firstMessageResult = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false)).get()
+    assertTrue(firstMessageResult.hasOffset)
+
+    // Wait for the expiration cycle to kick in.
+    Thread.sleep(600)
+
+    try {
+      // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
+      producer.commitTransaction()
+      fail("should have raised a TransactionTimeOutException since the transaction has expired")
+    } catch {
+      case _: TransactionTimeoutException =>
+      case e: ExecutionException =>

Review comment:
       Why would we wrap the txn timeout as an execution exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#issuecomment-842758541


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r505721580



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -671,6 +782,57 @@ class KafkaApisTest {
     }
   }
 
+  @Test
+  def shouldReplaceTxnTimeoutWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = {

Review comment:
       Correct me if I misunderstood your comment.  
   Error code replacement tests with TxnTimeout have beed added which is followed.
   shouldReplaceTxnTimeoutWithInvalidProducerEpochInAddOffsetToTxnWithOlderClient
   shouldReplaceTxnTimeoutWithInvalidProducerEpochInAddPartitionToTxnWithOlderClient
   shouldReplaceTxnTimeoutWithInvalidProducerEpochInEndTxnWithOlderClient




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502819657



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -580,6 +580,52 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertTrue(transactionalRecords.isEmpty)
   }
 
+  @Test
+  def testProducerUsableAfterTxnTimeOutAbortExcetion(): Unit = {
+    val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
+
+    producer.initTransactions()
+    producer.beginTransaction()
+
+    // The first message and hence the first AddPartitions request should be successfully sent.
+    val firstMessageResult = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false)).get()
+    assertTrue(firstMessageResult.hasOffset)
+
+    // Wait for the expiration cycle to kick in.
+    Thread.sleep(600)
+
+    try {
+      // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
+      producer.commitTransaction()
+      fail("should have raised a TransactionTimeOutException since the transaction has expired")
+    } catch {
+      case _: TransactionTimeoutException =>
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[TransactionTimeoutException])
+    }
+
+    val transactionalConsumer = transactionalConsumers.head
+    transactionalConsumer.subscribe(List(topic1).asJava)
+
+    val transactionalRecords = TestUtils.consumeRecordsFor(transactionalConsumer, 1000)
+    assertTrue(transactionalRecords.isEmpty)
+
+    producer.abortTransaction()
+    producer.beginTransaction()
+    // The second message and hence the second AddPartitions request should be successfully sent.

Review comment:
       What does this comment suggest?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#issuecomment-706734038


   @abbccdda @hachikuji Comments have been addressed, could you continue to review the pr.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r505722044



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1438,7 +1455,8 @@ public void handleResponse(AbstractResponse response) {
                     log.debug("Did not attempt to add partition {} to transaction because other partitions in the " +
                             "batch had errors.", topicPartition);
                     hasPartitionErrors = true;
-                } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
+                } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r491655717



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {

Review comment:
       `TransactionTimedOut`

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TransactionTimeoutException() {
+        super();
+    }
+
+    public TransactionTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionTimeoutException(String message) {

Review comment:
       Do we need all other constructors?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.TRANSACTION_TIMED_OUT, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+

Review comment:
       nit: remove empty line
   
   

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+

Review comment:
       nit: remove empty line

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1072,7 +1076,11 @@ private void transitionTo(State target, RuntimeException error) {
             if (error == null)
                 throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");
             lastError = error;
+            abortableError = error;
         } else {
+            if (target != State.ABORTING_TRANSACTION) {

Review comment:
       Does this relate to the optimization https://issues.apache.org/jira/browse/KAFKA-10504?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -2959,8 +3035,7 @@ public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
     }
 
-    @Test
-    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
+    private void verifyBumpTransactionalEpochOnRecoverableAddOffsetsRequestError(Errors errors) throws InterruptedException {

Review comment:
       nit: this func could be put after the callers

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1585,6 +1596,8 @@ public void handleResponse(AbstractResponse response) {
                 // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
                 // just treat it the same as PRODUCE_FENCED.
                 fatalError(Errors.PRODUCER_FENCED.exception());
+            } else if (error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       Could we merge with other cases on L1452?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {
                   PrepareAbort
-
-                if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                }
+                if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence
+                  || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
-                  isEpochFence = true
+                  isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence

Review comment:
       Correct me if wrong, but it seems that the reasoning for adding `PrepareEpochBumpThenAbort` is to avoid setting `lastProducerEpoch` to -1 in the time out state? If so, I was wondering whether we have a simpler way to set the lastProducerEpoch when we make the transition, just as in `prepareTransitionTo`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2045,9 +2045,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val needOld = endTxnRequest.version < 2 && (error == Errors.PRODUCER_FENCED || error == Errors
+            .TRANSACTION_TIMED_OUT)
           val finalError =
-            if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
-              // For older clients, they could not understand the new PRODUCER_FENCED error code,
+            if (needOld) {

Review comment:
       We should add a helper to return old error code, such as `maybeReturnOldClientError(Errors error, short requestVersion)` 

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {

Review comment:
       nit: could remove the braces




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502936935



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -580,6 +580,52 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertTrue(transactionalRecords.isEmpty)
   }
 
+  @Test
+  def testProducerUsableAfterTxnTimeOutAbortExcetion(): Unit = {

Review comment:
       ok. The reverse verification is added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r497217290



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {
                   PrepareAbort
-
-                if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                }
+                if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence
+                  || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
-                  isEpochFence = true
+                  isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence

Review comment:
       Thanks for your advice. I found a simpler way to set the lastProducerEpoch and PrepareEpochBumpThenAbort is removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501170471



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2083,8 +2083,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
           val finalError =
-            if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
-              // For older clients, they could not understand the new PRODUCER_FENCED error code,
+            if (maybeReturnOldClientError(error, endTxnRequest.version)) {

Review comment:
       nit: I am not sure this helper is worthwhile because it only works if the versions are aligned consistently. That is true as of now, but likely untrue in the future. Explicit version checks for each api would be safer.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2336,6 +2335,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def maybeReturnOldClientError(error: Errors, requestVersion: Short): Boolean = {
+    requestVersion < 2 && (error == Errors.PRODUCER_FENCED || error == Errors

Review comment:
       @abbccdda clarified offline that the version has already been bumped for 2.7, so this is not needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501204644



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -555,11 +555,11 @@ class TransactionsTest extends KafkaServerTestHarness {
     try {
       // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
       producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
-      fail("should have raised a ProducerFencedException since the transaction has expired")
+      fail("should have raised a TransactionTimeOutException since the transaction has expired")
     } catch {
-      case _: ProducerFencedException =>
+      case _: TransactionTimeOutException =>
       case e: ExecutionException =>
-      assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      assertTrue(e.getCause.isInstanceOf[TransactionTimeOutException])

Review comment:
       The main thing we need to test here is that the producer remains usable after the timeout.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +386,16 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {

Review comment:
       The code assumes that if we get a receive an EndTxn request and `lastProducerEpoch` has been set, then it must be because the coordinator timed out the transaction. That is definitely true in the common case, but I'm wondering if it is worth adding some state to `TransactionMetadata` which explicitly indicates that the transaction was timed out. Not super important and could be done in a follow-up.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -364,7 +368,8 @@ class TransactionCoordinator(brokerId: Int,
                              producerEpoch: Short,
                              txnMarkerResult: TransactionResult,
                              isFromClient: Boolean,
-                             responseCallback: EndTxnCallback): Unit = {
+                             responseCallback: EndTxnCallback,
+                             timeoutAbort: Boolean = false): Unit = {

Review comment:
       nit: can we avoid optional fields? it is better for the caller to be explicit

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1589,7 +1607,8 @@ public void handleResponse(AbstractResponse response) {
                 fatalError(error.exception());
             } else if (error == Errors.INVALID_TXN_STATE) {
                 fatalError(error.exception());
-            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
+            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING
+                    || error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       The documentation suggests that after catching `TransactionTimeoutException`, the user can just begin a new transaction. Unless I'm missing something, however, it seems like we still require an explicit call to `abortTransaction`. That is actually what I prefer, but we should clarify the expectation in the documentation. It is super important for the integration test to cover the full expected flow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502917518



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -580,6 +580,52 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertTrue(transactionalRecords.isEmpty)
   }
 
+  @Test
+  def testProducerUsableAfterTxnTimeOutAbortExcetion(): Unit = {
+    val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
+
+    producer.initTransactions()
+    producer.beginTransaction()
+
+    // The first message and hence the first AddPartitions request should be successfully sent.
+    val firstMessageResult = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false)).get()
+    assertTrue(firstMessageResult.hasOffset)
+
+    // Wait for the expiration cycle to kick in.
+    Thread.sleep(600)
+
+    try {
+      // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
+      producer.commitTransaction()
+      fail("should have raised a TransactionTimeOutException since the transaction has expired")
+    } catch {
+      case _: TransactionTimeoutException =>
+      case e: ExecutionException =>

Review comment:
       This wrap action refers to case testFencingOnAddPartitions, however it doesn’t seem that we need it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r493708455



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {
                   PrepareAbort
-
-                if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                }
+                if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence
+                  || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
-                  isEpochFence = true
+                  isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence

Review comment:
       According to my understanding, the semantics of PrepareEpochFence and  PrepareEpochBumpThenAbort 
    are different. If a new state is not introduced, what should the parameter `newState` fill in when transitionTo is called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501168361



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2336,6 +2335,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def maybeReturnOldClientError(error: Errors, requestVersion: Short): Boolean = {
+    requestVersion < 2 && (error == Errors.PRODUCER_FENCED || error == Errors

Review comment:
       We need to bump the protocol versions for the new error code. Otherwise older clients using version 2 will not know how to handle `TRANSACTION_TIMED_OUT` and will translate it to `UNKNOWN_SERVER_ERROR`, which is a regression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda closed pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda closed pull request #9311:
URL: https://github.com/apache/kafka/pull/9311


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502752681



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -555,11 +555,11 @@ class TransactionsTest extends KafkaServerTestHarness {
     try {
       // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
       producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
-      fail("should have raised a ProducerFencedException since the transaction has expired")
+      fail("should have raised a TransactionTimeOutException since the transaction has expired")
     } catch {
-      case _: ProducerFencedException =>
+      case _: TransactionTimeOutException =>
       case e: ExecutionException =>
-      assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      assertTrue(e.getCause.isInstanceOf[TransactionTimeOutException])

Review comment:
       Thanks for your reminding.
   I added a case to verify the producer remains usable after the timeout
   https://github.com/apache/kafka/pull/9311/files#diff-231b8aacf24c8458f322b4a9f872d4efR584




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502740233



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -369,7 +372,9 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
         // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server will not accept an EndTxnRequest, so skip
         // directly to InitProducerId. Otherwise, we must first abort the transaction, because the producer will be
         // fenced if we directly call InitProducerId.
-        if (!(lastError instanceof InvalidPidMappingException)) {
+        boolean needEndTxn = !(abortableError instanceof InvalidPidMappingException)

Review comment:
       I am not sure this helper is worthwhile because it is only used in beginCompletingTransaction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r505707007



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1198,18 +1209,22 @@ boolean canBumpEpoch() {
         return coordinatorSupportsBumpingEpoch;
     }
 
+    private void resetTransactions() {
+        newPartitionsInTransaction.clear();
+        pendingPartitionsInTransaction.clear();
+        partitionsInTransaction.clear();
+    }
+
     private void completeTransaction() {
         if (epochBumpRequired) {
             transitionTo(State.INITIALIZING);
         } else {
             transitionTo(State.READY);
         }
         lastError = null;
+        abortableError = null;
         epochBumpRequired = false;
-        transactionStarted = false;

Review comment:
       Thanks for reminding. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501158001



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeOutException.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeOutException extends ApiException {

Review comment:
       Can we change the name to `TransactionTimeoutException`. Seems more consistent to use lower-case 'o' (see e.g. `TimeoutException`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501155923



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1198,18 +1209,23 @@ boolean canBumpEpoch() {
         return coordinatorSupportsBumpingEpoch;
     }
 
+    private void resetTransactions() {
+        newPartitionsInTransaction.clear();
+        pendingPartitionsInTransaction.clear();
+        partitionsInTransaction.clear();
+

Review comment:
       nit: remove new line

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
##########
@@ -40,6 +40,7 @@
  *   - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
  *   - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
  *   - {@link Errors#PRODUCER_FENCED}
+ *   - {@link Errors#TRANSACTION_TIMED_OUT}

Review comment:
       We should also update the comment in corresponding JSON files to mention about the new error code, such as AddPartition/AddOffsets/EndTxn request/response.json

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
##########
@@ -34,6 +34,7 @@
  *   - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
  *   - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
  *   - {@link Errors#PRODUCER_FENCED}
+ *   - {@link Errors#TRANSACTION_TIMED_OUT}

Review comment:
       We should also add this to the `AddOffsetsToTxnResponse`

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +386,16 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {

Review comment:
       I'm not sure the logic here is actually simplified since we still return `PRODUCER_FENCED` twice. If not, we could still just add the inner if-else to the existing condition ` ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)`

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -369,7 +372,9 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
         // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server will not accept an EndTxnRequest, so skip
         // directly to InitProducerId. Otherwise, we must first abort the transaction, because the producer will be
         // fenced if we directly call InitProducerId.
-        if (!(lastError instanceof InvalidPidMappingException)) {
+        boolean needEndTxn = !(abortableError instanceof InvalidPidMappingException)

Review comment:
       We should also update the comment above. And I think we could extract `needEndTxn` as a helper function, if it is true universally to check abortableError for txn ending.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502818179



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2083,8 +2083,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
           val finalError =
-            if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
-              // For older clients, they could not understand the new PRODUCER_FENCED error code,
+            if (maybeReturnOldClientError(error, endTxnRequest.version)) {

Review comment:
       I see your pt, we could get rid of the helper.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r491655717



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {

Review comment:
       `TransactionTimedOut`

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TransactionTimeoutException() {
+        super();
+    }
+
+    public TransactionTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionTimeoutException(String message) {

Review comment:
       Do we need all other constructors?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.TRANSACTION_TIMED_OUT, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+

Review comment:
       nit: remove empty line
   
   

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+

Review comment:
       nit: remove empty line

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1072,7 +1076,11 @@ private void transitionTo(State target, RuntimeException error) {
             if (error == null)
                 throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");
             lastError = error;
+            abortableError = error;
         } else {
+            if (target != State.ABORTING_TRANSACTION) {

Review comment:
       Does this relate to the optimization https://issues.apache.org/jira/browse/KAFKA-10504?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -2959,8 +3035,7 @@ public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
     }
 
-    @Test
-    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
+    private void verifyBumpTransactionalEpochOnRecoverableAddOffsetsRequestError(Errors errors) throws InterruptedException {

Review comment:
       nit: this func could be put after the callers

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1585,6 +1596,8 @@ public void handleResponse(AbstractResponse response) {
                 // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
                 // just treat it the same as PRODUCE_FENCED.
                 fatalError(Errors.PRODUCER_FENCED.exception());
+            } else if (error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       Could we merge with other cases on L1452?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {
                   PrepareAbort
-
-                if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                }
+                if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence
+                  || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
-                  isEpochFence = true
+                  isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence

Review comment:
       Correct me if wrong, but it seems that the reasoning for adding `PrepareEpochBumpThenAbort` is to avoid setting `lastProducerEpoch` to -1 in the time out state? If so, I was wondering whether we have a simpler way to set the lastProducerEpoch when we make the transition, just as in `prepareTransitionTo`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2045,9 +2045,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val needOld = endTxnRequest.version < 2 && (error == Errors.PRODUCER_FENCED || error == Errors
+            .TRANSACTION_TIMED_OUT)
           val finalError =
-            if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
-              // For older clients, they could not understand the new PRODUCER_FENCED error code,
+            if (needOld) {

Review comment:
       We should add a helper to return old error code, such as `maybeReturnOldClientError(Errors error, short requestVersion)` 

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {

Review comment:
       nit: could remove the braces




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502744351



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +386,16 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {

Review comment:
       Thanks for your adivce. I have create an issue to track this follow-up: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-10596?filter=allissues#




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r491655717



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {

Review comment:
       `TransactionTimedOut`

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TransactionTimeoutException() {
+        super();
+    }
+
+    public TransactionTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionTimeoutException(String message) {

Review comment:
       Do we need all other constructors?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.TRANSACTION_TIMED_OUT, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+

Review comment:
       nit: remove empty line
   
   

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+

Review comment:
       nit: remove empty line

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1072,7 +1076,11 @@ private void transitionTo(State target, RuntimeException error) {
             if (error == null)
                 throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");
             lastError = error;
+            abortableError = error;
         } else {
+            if (target != State.ABORTING_TRANSACTION) {

Review comment:
       Does this relate to the optimization https://issues.apache.org/jira/browse/KAFKA-10504?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -2959,8 +3035,7 @@ public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
     }
 
-    @Test
-    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
+    private void verifyBumpTransactionalEpochOnRecoverableAddOffsetsRequestError(Errors errors) throws InterruptedException {

Review comment:
       nit: this func could be put after the callers

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1585,6 +1596,8 @@ public void handleResponse(AbstractResponse response) {
                 // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
                 // just treat it the same as PRODUCE_FENCED.
                 fatalError(Errors.PRODUCER_FENCED.exception());
+            } else if (error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       Could we merge with other cases on L1452?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {
                   PrepareAbort
-
-                if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                }
+                if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence
+                  || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
-                  isEpochFence = true
+                  isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence

Review comment:
       Correct me if wrong, but it seems that the reasoning for adding `PrepareEpochBumpThenAbort` is to avoid setting `lastProducerEpoch` to -1 in the time out state? If so, I was wondering whether we have a simpler way to set the lastProducerEpoch when we make the transition, just as in `prepareTransitionTo`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2045,9 +2045,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val needOld = endTxnRequest.version < 2 && (error == Errors.PRODUCER_FENCED || error == Errors
+            .TRANSACTION_TIMED_OUT)
           val finalError =
-            if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
-              // For older clients, they could not understand the new PRODUCER_FENCED error code,
+            if (needOld) {

Review comment:
       We should add a helper to return old error code, such as `maybeReturnOldClientError(Errors error, short requestVersion)` 

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {

Review comment:
       nit: could remove the braces




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r502751077



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1589,7 +1607,8 @@ public void handleResponse(AbstractResponse response) {
                 fatalError(error.exception());
             } else if (error == Errors.INVALID_TXN_STATE) {
                 fatalError(error.exception());
-            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
+            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING
+                    || error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       Thanks for your advice. I will update doc and the kip later




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r505708598



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1198,18 +1209,22 @@ boolean canBumpEpoch() {
         return coordinatorSupportsBumpingEpoch;
     }
 
+    private void resetTransactions() {
+        newPartitionsInTransaction.clear();
+        pendingPartitionsInTransaction.clear();
+        partitionsInTransaction.clear();
+    }
+
     private void completeTransaction() {
         if (epochBumpRequired) {
             transitionTo(State.INITIALIZING);
         } else {
             transitionTo(State.READY);
         }
         lastError = null;
+        abortableError = null;
         epochBumpRequired = false;
-        transactionStarted = false;

Review comment:
       If this is the case, could we just remove this intermediate variable?
   ================================
   It could have been omitted, but now this variable has a new use when call initTransactions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org