You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/24 21:39:08 UTC
[kafka] branch 2.5 updated: KAFKA-9749;
Transaction coordinator should treat KAFKA_STORAGE_ERROR as
retriable (#8336)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 5a15676 KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336)
5a15676 is described below
commit 5a1567631d78575b86136d741f781b475106c0b9
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Tue Mar 24 14:27:53 2020 -0700
KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336)
When handling a WriteTxnResponse, the TransactionMarkerRequestCompletionHandler throws an IllegalStateException when the remote broker responds with a KAFKA_STORAGE_ERROR and does not retry the request. This leaves the transaction state stuck in PendingAbort or PendingCommit, with no way to change that state other than restarting the broker, because both EndTxnRequest and InitProducerIdRequest return CONCURRENT_TRANSACTIONS if the state is PendingAbort or PendingCommit. This patch cha [...]
Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../transaction/TransactionMarkerRequestCompletionHandler.scala | 3 ++-
.../transaction/TransactionMarkerRequestCompletionHandlerTest.scala | 5 +++++
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index fefe767..f655770 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -146,7 +146,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
Errors.NOT_LEADER_FOR_PARTITION |
Errors.NOT_ENOUGH_REPLICAS |
Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
- Errors.REQUEST_TIMED_OUT => // these are retriable errors
+ Errors.REQUEST_TIMED_OUT |
+ Errors.KAFKA_STORAGE_ERROR => // these are retriable errors
info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " +
s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}")
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 0de9c6f..15b7a9e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -192,6 +192,11 @@ class TransactionMarkerRequestCompletionHandlerTest {
}
@Test
+ def shouldRetryPartitionWhenKafkaStorageError(): Unit = {
+ verifyRetriesPartitionOnError(Errors.KAFKA_STORAGE_ERROR)
+ }
+
+ @Test
def shouldRemoveTopicPartitionFromWaitingSetOnUnsupportedForMessageFormat(): Unit = {
mockCache()
verifyCompleteDelayedOperationOnError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)