You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/22 00:52:33 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r491655717



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {

Review comment:
       `TransactionTimedOut`

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the coordinator.
+ * When encountering this exception, the producer should retry initialization with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TransactionTimeoutException() {
+        super();
+    }
+
+    public TransactionTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionTimeoutException(String message) {

Review comment:
       Do we need all other constructors?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        prepareProduceResponse(Errors.NONE, producerId, epoch);
+        prepareEndTxnResponse(Errors.TRANSACTION_TIMED_OUT, TransactionResult.COMMIT, producerId, epoch);
+
+        runUntil(commitResult::isCompleted);
+        runUntil(responseFuture::isDone);
+

Review comment:
       nit: remove empty line
   
   

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
         verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
     }
 
+    @Test
+    public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException {
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        transactionManager.failIfNotReadyForSend();
+        transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
+
+        Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
+
+        assertFalse(responseFuture.isDone());
+        prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch);
+
+        verifyTxnTimeout(responseFuture);
+    }
+
+    @Test
+    public void testTxnTimeoutInEndTxn() throws InterruptedException {
+

Review comment:
       nit: remove empty line

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1072,7 +1076,11 @@ private void transitionTo(State target, RuntimeException error) {
             if (error == null)
                 throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");
             lastError = error;
+            abortableError = error;
         } else {
+            if (target != State.ABORTING_TRANSACTION) {

Review comment:
       Does this relate to the optimization https://issues.apache.org/jira/browse/KAFKA-10504?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##########
@@ -2959,8 +3035,7 @@ public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
     }
 
-    @Test
-    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
+    private void verifyBumpTransactionalEpochOnRecoverableAddOffsetsRequestError(Errors errors) throws InterruptedException {

Review comment:
       nit: this func could be put after the callers

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1585,6 +1596,8 @@ public void handleResponse(AbstractResponse response) {
                 // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
                 // just treat it the same as PRODUCE_FENCED.
                 fatalError(Errors.PRODUCER_FENCED.exception());
+            } else if (error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       Could we merge with other cases on L1452?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {
                   PrepareAbort
-
-                if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                }
+                if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence
+                  || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) {
                   // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to append.
-                  isEpochFence = true
+                  isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence

Review comment:
       Correct me if wrong, but it seems that the reasoning for adding `PrepareEpochBumpThenAbort` is to avoid setting `lastProducerEpoch` to -1 in the time out state? If so, I was wondering whether we have a simpler way to set the lastProducerEpoch when we make the transition, just as in `prepareTransitionTo`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2045,9 +2045,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val needOld = endTxnRequest.version < 2 && (error == Errors.PRODUCER_FENCED || error == Errors
+            .TRANSACTION_TIMED_OUT)
           val finalError =
-            if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
-              // For older clients, they could not understand the new PRODUCER_FENCED error code,
+            if (needOld) {

Review comment:
       We should add a helper to return old error code, such as `maybeReturnOldClientError(Errors error, short requestVersion)` 

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {
+                Left(Errors.TRANSACTION_TIMED_OUT)
+              } else {
+                Left(Errors.PRODUCER_FENCED)
+              }
+            } else if (producerEpoch < txnMetadata.producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
+            } else if (txnMetadata.pendingTransitionInProgress
+              && !txnMetadata.pendingState.contains(PrepareEpochFence)
+              && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort))
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                   PrepareCommit
-                else
+                else {

Review comment:
       nit: could remove the braces




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org