You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/24 21:39:08 UTC

[kafka] branch 2.5 updated: KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 5a15676  KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336)
5a15676 is described below

commit 5a1567631d78575b86136d741f781b475106c0b9
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Tue Mar 24 14:27:53 2020 -0700

    KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336)
    
    When handling a WriteTxnResponse, the TransactionMarkerRequestCompletionHandler throws an IllegalStateException when the remote broker responds with a KAFKA_STORAGE_ERROR and does not retry the request. This leaves the transaction state stuck in PendingAbort or PendingCommit, with no way to change that state other than restarting the broker, because both EndTxnRequest and InitProducerIdRequest return CONCURRENT_TRANSACTIONS if the state is PendingAbort or PendingCommit. This patch cha [...]
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../transaction/TransactionMarkerRequestCompletionHandler.scala      | 3 ++-
 .../transaction/TransactionMarkerRequestCompletionHandlerTest.scala  | 5 +++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index fefe767..f655770 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -146,7 +146,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                          Errors.NOT_LEADER_FOR_PARTITION |
                          Errors.NOT_ENOUGH_REPLICAS |
                          Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
-                         Errors.REQUEST_TIMED_OUT => // these are retriable errors
+                         Errors.REQUEST_TIMED_OUT |
+                         Errors.KAFKA_STORAGE_ERROR => // these are retriable errors
 
                       info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " +
                         s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}")
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 0de9c6f..15b7a9e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -192,6 +192,11 @@ class TransactionMarkerRequestCompletionHandlerTest {
   }
 
   @Test
+  def shouldRetryPartitionWhenKafkaStorageError(): Unit = {
+    verifyRetriesPartitionOnError(Errors.KAFKA_STORAGE_ERROR)
+  }
+
+  @Test
   def shouldRemoveTopicPartitionFromWaitingSetOnUnsupportedForMessageFormat(): Unit = {
     mockCache()
     verifyCompleteDelayedOperationOnError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)