You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/22 18:36:43 UTC

[kafka] branch 2.0 updated: KAFKA-7519 Clear pending transaction state when expiration fails (#5820)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new cacfe90  KAFKA-7519 Clear pending transaction state when expiration fails (#5820)
cacfe90 is described below

commit cacfe90b952bc0110024cb001946ff17d1f006be
Author: Bridger Howell <32...@users.noreply.github.com>
AuthorDate: Mon Oct 22 06:14:32 2018 -0600

    KAFKA-7519 Clear pending transaction state when expiration fails (#5820)
    
    Make sure that the transaction state is properly cleared when the
    `transactionalId-expiration` task fails. Operations on that transactional
    id would otherwise return a `CONCURRENT_TRANSACTIONS` error
    and appear "untouchable" to transaction state changes, preventing
    transactional producers from operating until a broker restart or
    transaction coordinator change.
    
    Unit tested by verifying that having the `transactionalId-expiration` task
    won't leave the transaction metadata in a pending state if the replica
    manager returns an error.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../transaction/TransactionStateManager.scala      | 49 ++++++++++------------
 .../transaction/TransactionStateManagerTest.scala  | 27 ++++++------
 2 files changed, 36 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 50d96c3..2a4abb4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -166,37 +166,32 @@ class TransactionStateManager(brokerId: Int,
             (topicPartition, records)
           }
 
-
         def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
           responses.foreach { case (topicPartition, response) =>
-            response.error match {
-              case Errors.NONE =>
-                inReadLock(stateLock) {
-                  val toRemove = transactionalIdByPartition(topicPartition.partition())
-                  transactionMetadataCache.get(topicPartition.partition)
-                    .foreach { txnMetadataCacheEntry =>
-                      toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                        val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
-                        txnMetadata.inLock {
-                          if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                            && txnMetadata.pendingState.contains(Dead)
-                            && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                          )
-                            txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId)
-                          else {
-                             debug(s"failed to remove expired transactionalId: ${idCoordinatorEpochAndMetadata.transactionalId}" +
-                               s" from cache. pendingState: ${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" +
-                               s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" +
-                               s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " +
-                               s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                            txnMetadata.pendingState = None
-                          }
-                        }
-                      }
+            inReadLock(stateLock) {
+              val toRemove = transactionalIdByPartition(topicPartition.partition)
+              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
+                toRemove.foreach { idCoordinatorEpochAndMetadata =>
+                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
+                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
+                  txnMetadata.inLock {
+                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
+                      && txnMetadata.pendingState.contains(Dead)
+                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
+                      && response.error == Errors.NONE) {
+                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
+                    } else {
+                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
+                        s" from cache. Tombstone append error code: ${response.error}," +
+                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
+                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
+                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
+                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
+                      txnMetadata.pendingState = None
                     }
+                  }
                 }
-              case _ =>
-                debug(s"writing transactionalId tombstones for partition: ${topicPartition.partition} failed with error: ${response.error.message()}")
+              }
             }
           }
         }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 74bbe33..ff2cbcf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -419,56 +419,57 @@ class TransactionStateManagerTest {
   def shouldRemoveCompleteCommmitExpiredTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteCommit)
     verifyMetadataDoesntExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldRemoveCompleteAbortExpiredTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteAbort)
     verifyMetadataDoesntExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldRemoveEmptyExpiredTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, Empty)
     verifyMetadataDoesntExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemoveExpiredTransactionalIdsIfLogAppendFails(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NOT_ENOUGH_REPLICAS, CompleteAbort)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemoveOngoingTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, Ongoing)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemovePrepareAbortTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareAbort)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemovePrepareCommitTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareCommit)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
-  private def verifyMetadataDoesExist(transactionalId: String) = {
+  private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = {
     transactionManager.getTransactionState(transactionalId) match {
       case Left(errors) => fail("shouldn't have been any errors")
       case Right(None) => fail("metadata should have been removed")
-      case Right(Some(metadata)) => // ok
+      case Right(Some(metadata)) =>
+        assertTrue("metadata shouldn't be in a pending state", metadata.transactionMetadata.pendingState.isEmpty)
     }
   }