You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/23 23:49:42 UTC

[kafka] branch 2.2 updated: KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 65c91fe  KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840)
65c91fe is described below

commit 65c91fe25238f576287f5086a74eceefe724c875
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Mon Dec 23 15:20:40 2019 -0800

    KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840)
    
    Allow transaction metadata to be reloaded, even if it already exists as of a previous epoch. This helps with cases where a previous become-follower transition failed to unload corresponding metadata.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../transaction/TransactionCoordinator.scala       | 14 +++--
 .../transaction/TransactionStateManager.scala      | 62 +++++++++++-----------
 .../transaction/TransactionStateManagerTest.scala  | 28 +++++++++-
 3 files changed, 68 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 9d4eed6..437caeb 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -274,13 +274,19 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
-  def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
+  def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    // The operations performed during immigration must be resilient to any previous errors we saw or partial state we
+    // left off during the unloading phase. Ensure we remove all associated state for this partition before we continue
+    // loading it.
+    txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
+
+    // Now load the partition.
     txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
   }
 
-  def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
-      txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
-      txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
+  def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
+    txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
   }
 
   private def logInvalidStateTransitionAndReturnError(transactionalId: String,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index b45953f..bd6d487 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -84,13 +84,13 @@ class TransactionStateManager(brokerId: Int,
   private val stateLock = new ReentrantReadWriteLock()
 
   /** partitions of transaction topic that are being loaded, state lock should be called BEFORE accessing this set */
-  private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+  private[transaction] val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
 
   /** partitions of transaction topic that are being removed, state lock should be called BEFORE accessing this set */
-  private val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+  private[transaction] val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
 
   /** transaction metadata cache indexed by assigned transaction topic partition ids */
-  private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
+  private[transaction] val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
 
   /** number of partitions for the transaction log topic */
   private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
@@ -352,31 +352,25 @@ class TransactionStateManager(brokerId: Int,
 
   /**
    * Add a transaction topic partition into the cache
-   *
-   * Make it package-private to be used only for unit tests.
    */
-  private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int, coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata]): Unit = {
-    val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, metadataPerTransactionalId)
-    val currentTxnMetadataCacheEntry = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
-
-    if (currentTxnMetadataCacheEntry.isDefined) {
-      val coordinatorEpoch = currentTxnMetadataCacheEntry.get.coordinatorEpoch
-      val metadataPerTxnId = currentTxnMetadataCacheEntry.get.metadataPerTransactionalId
-      val errorMsg = s"The metadata cache for txn partition $txnTopicPartition has already exist with epoch $coordinatorEpoch " +
-        s"and ${metadataPerTxnId.size} entries while trying to add to it; " +
-        s"this should not happen"
-      fatal(errorMsg)
-      throw new IllegalStateException(errorMsg)
+  private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int,
+                                                        coordinatorEpoch: Int,
+                                                        loadedTransactions: Pool[String, TransactionMetadata]): Unit = {
+    val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, loadedTransactions)
+    val previousTxnMetadataCacheEntryOpt = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
+
+    previousTxnMetadataCacheEntryOpt.foreach { previousTxnMetadataCacheEntry =>
+      warn(s"Unloaded transaction metadata $previousTxnMetadataCacheEntry from $txnTopicPartition as part of " +
+        s"loading metadata at epoch $coordinatorEpoch")
     }
   }
 
   /**
-   * When this broker becomes a leader for a transaction log partition, load this partition and
-   * populate the transaction metadata cache with the transactional ids.
+   * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction
+   * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from
+   * the previous loading / unloading operation.
    */
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback) {
-    validateTransactionTopicPartitionCountIsStable()
-
+  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 
@@ -385,8 +379,10 @@ class TransactionStateManager(brokerId: Int,
       loadingPartitions.add(partitionAndLeaderEpoch)
     }
 
-    def loadTransactions() {
-      info(s"Loading transaction metadata from $topicPartition")
+    def loadTransactions(): Unit = {
+      info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch")
+      validateTransactionTopicPartitionCountIsStable()
+
       val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
 
       inWriteLock(stateLock) {
@@ -421,6 +417,8 @@ class TransactionStateManager(brokerId: Int,
           }
         }
       }
+
+      info(s"Completed loading transaction metadata from $topicPartition for coordinator epoch $coordinatorEpoch")
     }
 
     scheduler.schedule(s"load-txns-for-partition-$topicPartition", () => loadTransactions)
@@ -430,9 +428,7 @@ class TransactionStateManager(brokerId: Int,
    * When this broker becomes a follower for a transaction log partition, clear out the cache for corresponding transactional ids
    * that belong to that partition.
    */
-  def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int) {
-    validateTransactionTopicPartitionCountIsStable()
-
+  def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int): Unit = {
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 
@@ -446,11 +442,10 @@ class TransactionStateManager(brokerId: Int,
         if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
           transactionMetadataCache.remove(partitionId) match {
             case Some(txnMetadataCacheEntry) =>
-              info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition")
+              info(s"Unloaded transaction metadata $txnMetadataCacheEntry for $topicPartition on become-follower transition")
 
             case None =>
-              info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " +
-                s"it is likely that another process for removing the cached entries has just executed earlier before")
+              info(s"No cached transaction metadata found for $topicPartition during become-follower transition")
           }
 
           leavingPartitions.remove(partitionAndLeaderEpoch)
@@ -643,7 +638,12 @@ class TransactionStateManager(brokerId: Int,
 }
 
 
-private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata])
+private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
+                                                      metadataPerTransactionalId: Pool[String, TransactionMetadata]) {
+  override def toString: String = {
+    s"TxnMetadataCacheEntry(coordinatorEpoch=$coordinatorEpoch, numTransactionalEntries=${metadataPerTransactionalId.size})"
+  }
+}
 
 private[transaction] case class CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int, transactionMetadata: TransactionMetadata)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 34a37be..830e2c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -463,7 +463,33 @@ class TransactionStateManagerTest {
     verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
-  private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = {
+  @Test
+  def testSuccessfulReimmigration(): Unit = {
+    txnMetadata1.state = PrepareCommit
+    txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
+      new TopicPartition("topic1", 1)))
+
+    txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
+    val startOffset = 0L
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
+
+    prepareTxnLog(topicPartition, 0, records)
+
+    // immigrate partition at epoch 0
+    transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ())
+    assertEquals(0, transactionManager.loadingPartitions.size)
+    assertEquals(0, transactionManager.leavingPartitions.size)
+
+    // Re-immigrate partition at epoch 1. This should be successful even though we didn't get to emigrate the partition.
+    prepareTxnLog(topicPartition, 0, records)
+    transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 1, (_, _, _, _, _) => ())
+    assertEquals(0, transactionManager.loadingPartitions.size)
+    assertEquals(0, transactionManager.leavingPartitions.size)
+    assertTrue(transactionManager.transactionMetadataCache.get(partitionId).isDefined)
+    assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
+  }
+
+  private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = {
     transactionManager.getTransactionState(transactionalId) match {
       case Left(errors) => fail("shouldn't have been any errors")
       case Right(None) => fail("metadata should have been removed")