You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/23 23:49:42 UTC
[kafka] branch 2.2 updated: KAFKA-9307;
Make transaction metadata loading resilient to previous errors
(#7840)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 65c91fe KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840)
65c91fe is described below
commit 65c91fe25238f576287f5086a74eceefe724c875
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Mon Dec 23 15:20:40 2019 -0800
KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840)
Allow transaction metadata to be reloaded, even if it already exists as of a previous epoch. This helps with cases where a previous become-follower transition failed to unload corresponding metadata.
Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../transaction/TransactionCoordinator.scala | 14 +++--
.../transaction/TransactionStateManager.scala | 62 +++++++++++-----------
.../transaction/TransactionStateManagerTest.scala | 28 +++++++++-
3 files changed, 68 insertions(+), 36 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 9d4eed6..437caeb 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -274,13 +274,19 @@ class TransactionCoordinator(brokerId: Int,
}
}
- def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
+ def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+ // The operations performed during immigration must be resilient to any previous errors we saw or partial state we
+ // left off during the unloading phase. Ensure we remove all associated state for this partition before we continue
+ // loading it.
+ txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
+
+ // Now load the partition.
txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
}
- def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
- txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
- txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
+ def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+ txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
+ txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
}
private def logInvalidStateTransitionAndReturnError(transactionalId: String,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index b45953f..bd6d487 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -84,13 +84,13 @@ class TransactionStateManager(brokerId: Int,
private val stateLock = new ReentrantReadWriteLock()
/** partitions of transaction topic that are being loaded, state lock should be called BEFORE accessing this set */
- private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+ private[transaction] val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** partitions of transaction topic that are being removed, state lock should be called BEFORE accessing this set */
- private val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+ private[transaction] val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** transaction metadata cache indexed by assigned transaction topic partition ids */
- private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
+ private[transaction] val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
/** number of partitions for the transaction log topic */
private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
@@ -352,31 +352,25 @@ class TransactionStateManager(brokerId: Int,
/**
* Add a transaction topic partition into the cache
- *
- * Make it package-private to be used only for unit tests.
*/
- private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int, coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata]): Unit = {
- val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, metadataPerTransactionalId)
- val currentTxnMetadataCacheEntry = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
-
- if (currentTxnMetadataCacheEntry.isDefined) {
- val coordinatorEpoch = currentTxnMetadataCacheEntry.get.coordinatorEpoch
- val metadataPerTxnId = currentTxnMetadataCacheEntry.get.metadataPerTransactionalId
- val errorMsg = s"The metadata cache for txn partition $txnTopicPartition has already exist with epoch $coordinatorEpoch " +
- s"and ${metadataPerTxnId.size} entries while trying to add to it; " +
- s"this should not happen"
- fatal(errorMsg)
- throw new IllegalStateException(errorMsg)
+ private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int,
+ coordinatorEpoch: Int,
+ loadedTransactions: Pool[String, TransactionMetadata]): Unit = {
+ val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, loadedTransactions)
+ val previousTxnMetadataCacheEntryOpt = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
+
+ previousTxnMetadataCacheEntryOpt.foreach { previousTxnMetadataCacheEntry =>
+ warn(s"Unloaded transaction metadata $previousTxnMetadataCacheEntry from $txnTopicPartition as part of " +
+ s"loading metadata at epoch $coordinatorEpoch")
}
}
/**
- * When this broker becomes a leader for a transaction log partition, load this partition and
- * populate the transaction metadata cache with the transactional ids.
+ * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction
+ * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from
+ * the previous loading / unloading operation.
*/
- def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback) {
- validateTransactionTopicPartitionCountIsStable()
-
+ def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -385,8 +379,10 @@ class TransactionStateManager(brokerId: Int,
loadingPartitions.add(partitionAndLeaderEpoch)
}
- def loadTransactions() {
- info(s"Loading transaction metadata from $topicPartition")
+ def loadTransactions(): Unit = {
+ info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch")
+ validateTransactionTopicPartitionCountIsStable()
+
val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
inWriteLock(stateLock) {
@@ -421,6 +417,8 @@ class TransactionStateManager(brokerId: Int,
}
}
}
+
+ info(s"Completed loading transaction metadata from $topicPartition for coordinator epoch $coordinatorEpoch")
}
scheduler.schedule(s"load-txns-for-partition-$topicPartition", () => loadTransactions)
@@ -430,9 +428,7 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a follower for a transaction log partition, clear out the cache for corresponding transactional ids
* that belong to that partition.
*/
- def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int) {
- validateTransactionTopicPartitionCountIsStable()
-
+ def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int): Unit = {
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -446,11 +442,10 @@ class TransactionStateManager(brokerId: Int,
if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
transactionMetadataCache.remove(partitionId) match {
case Some(txnMetadataCacheEntry) =>
- info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition")
+ info(s"Unloaded transaction metadata $txnMetadataCacheEntry for $topicPartition on become-follower transition")
case None =>
- info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " +
- s"it is likely that another process for removing the cached entries has just executed earlier before")
+ info(s"No cached transaction metadata found for $topicPartition during become-follower transition")
}
leavingPartitions.remove(partitionAndLeaderEpoch)
@@ -643,7 +638,12 @@ class TransactionStateManager(brokerId: Int,
}
-private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata])
+private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
+ metadataPerTransactionalId: Pool[String, TransactionMetadata]) {
+ override def toString: String = {
+ s"TxnMetadataCacheEntry(coordinatorEpoch=$coordinatorEpoch, numTransactionalEntries=${metadataPerTransactionalId.size})"
+ }
+}
private[transaction] case class CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int, transactionMetadata: TransactionMetadata)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 34a37be..830e2c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -463,7 +463,33 @@ class TransactionStateManagerTest {
verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
- private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = {
+ @Test
+ def testSuccessfulReimmigration(): Unit = {
+ txnMetadata1.state = PrepareCommit
+ txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)))
+
+ txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
+ val startOffset = 0L
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
+
+ prepareTxnLog(topicPartition, 0, records)
+
+ // immigrate partition at epoch 0
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ())
+ assertEquals(0, transactionManager.loadingPartitions.size)
+ assertEquals(0, transactionManager.leavingPartitions.size)
+
+ // Re-immigrate partition at epoch 1. This should be successful even though we didn't get to emigrate the partition.
+ prepareTxnLog(topicPartition, 0, records)
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 1, (_, _, _, _, _) => ())
+ assertEquals(0, transactionManager.loadingPartitions.size)
+ assertEquals(0, transactionManager.leavingPartitions.size)
+ assertTrue(transactionManager.transactionMetadataCache.get(partitionId).isDefined)
+ assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
+ }
+
+ private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = {
transactionManager.getTransactionState(transactionalId) match {
case Left(errors) => fail("shouldn't have been any errors")
case Right(None) => fail("metadata should have been removed")