You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/22 18:36:43 UTC
[kafka] branch 2.0 updated: KAFKA-7519 Clear pending transaction
state when expiration fails (#5820)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new cacfe90 KAFKA-7519 Clear pending transaction state when expiration fails (#5820)
cacfe90 is described below
commit cacfe90b952bc0110024cb001946ff17d1f006be
Author: Bridger Howell <32...@users.noreply.github.com>
AuthorDate: Mon Oct 22 06:14:32 2018 -0600
KAFKA-7519 Clear pending transaction state when expiration fails (#5820)
Make sure that the transaction state is properly cleared when the
`transactionalId-expiration` task fails. Operations on that transactional
id would otherwise return a `CONCURRENT_TRANSACTIONS` error
and appear "untouchable" to transaction state changes, preventing
transactional producers from operating until a broker restart or
transaction coordinator change.
Unit tested by verifying that having the `transactionalId-expiration` task
won't leave the transaction metadata in a pending state if the replica
manager returns an error.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../transaction/TransactionStateManager.scala | 49 ++++++++++------------
.../transaction/TransactionStateManagerTest.scala | 27 ++++++------
2 files changed, 36 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 50d96c3..2a4abb4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -166,37 +166,32 @@ class TransactionStateManager(brokerId: Int,
(topicPartition, records)
}
-
def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
responses.foreach { case (topicPartition, response) =>
- response.error match {
- case Errors.NONE =>
- inReadLock(stateLock) {
- val toRemove = transactionalIdByPartition(topicPartition.partition())
- transactionMetadataCache.get(topicPartition.partition)
- .foreach { txnMetadataCacheEntry =>
- toRemove.foreach { idCoordinatorEpochAndMetadata =>
- val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
- txnMetadata.inLock {
- if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
- && txnMetadata.pendingState.contains(Dead)
- && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
- )
- txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId)
- else {
- debug(s"failed to remove expired transactionalId: ${idCoordinatorEpochAndMetadata.transactionalId}" +
- s" from cache. pendingState: ${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" +
- s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" +
- s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " +
- s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
- txnMetadata.pendingState = None
- }
- }
- }
+ inReadLock(stateLock) {
+ val toRemove = transactionalIdByPartition(topicPartition.partition)
+ transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
+ toRemove.foreach { idCoordinatorEpochAndMetadata =>
+ val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
+ val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
+ txnMetadata.inLock {
+ if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
+ && txnMetadata.pendingState.contains(Dead)
+ && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
+ && response.error == Errors.NONE) {
+ txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
+ } else {
+ warn(s"Failed to remove expired transactionalId: $transactionalId" +
+ s" from cache. Tombstone append error code: ${response.error}," +
+ s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
+ s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
+ s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
+ s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
+ txnMetadata.pendingState = None
}
+ }
}
- case _ =>
- debug(s"writing transactionalId tombstones for partition: ${topicPartition.partition} failed with error: ${response.error.message()}")
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 74bbe33..ff2cbcf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -419,56 +419,57 @@ class TransactionStateManagerTest {
def shouldRemoveCompleteCommmitExpiredTransactionalIds(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteCommit)
verifyMetadataDoesntExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
@Test
def shouldRemoveCompleteAbortExpiredTransactionalIds(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteAbort)
verifyMetadataDoesntExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
@Test
def shouldRemoveEmptyExpiredTransactionalIds(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NONE, Empty)
verifyMetadataDoesntExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
@Test
def shouldNotRemoveExpiredTransactionalIdsIfLogAppendFails(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NOT_ENOUGH_REPLICAS, CompleteAbort)
- verifyMetadataDoesExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId1)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
@Test
def shouldNotRemoveOngoingTransactionalIds(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NONE, Ongoing)
- verifyMetadataDoesExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId1)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
@Test
def shouldNotRemovePrepareAbortTransactionalIds(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareAbort)
- verifyMetadataDoesExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId1)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
@Test
def shouldNotRemovePrepareCommitTransactionalIds(): Unit = {
setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareCommit)
- verifyMetadataDoesExist(transactionalId1)
- verifyMetadataDoesExist(transactionalId2)
+ verifyMetadataDoesExistAndIsUsable(transactionalId1)
+ verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
- private def verifyMetadataDoesExist(transactionalId: String) = {
+ private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = {
transactionManager.getTransactionState(transactionalId) match {
case Left(errors) => fail("shouldn't have been any errors")
case Right(None) => fail("metadata should have been removed")
- case Right(Some(metadata)) => // ok
+ case Right(Some(metadata)) =>
+ assertTrue("metadata shouldn't be in a pending state", metadata.transactionMetadata.pendingState.isEmpty)
}
}