You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/30 21:57:35 UTC

kafka git commit: KAFKA-5202: Handle topic deletion while trying to send txn markers

Repository: kafka
Updated Branches:
  refs/heads/trunk f0745cd51 -> 80223b14e


KAFKA-5202: Handle topic deletion while trying to send txn markers

Here is the sketch of this proposal:

1. When it is time to send the txn markers, only look for the leader node of the partition once instead of retrying, and if that information is not available, it means the partition is highly likely been removed since it was in the cache before. In this case, we just remove the partition from the metadata object and skip putting into the corresponding queue, and if all partitions' leader broker are non-available, complete this delayed operation to proceed to write the complete txn log entry.

2. If the leader id is unknown from the cache but the corresponding node object with the listener name is not available, it means that the leader is likely unavailable right now. Put it into a separate queue and let sender thread retry fetching its metadata again each time upon draining the queue.

One caveat of this approach is the delete-and-recreate case, and the argument is that since all the messages are deleted anyways when deleting the topic-partition, it does not matter whether the markers are on the log partitions or not.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Apurva Mehta <ap...@confluent.io>, Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3130 from guozhangwang/K5202-handle-topic-deletion


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/80223b14
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/80223b14
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/80223b14

Branch: refs/heads/trunk
Commit: 80223b14ee092e95d05b40b12631df2d6db7ef53
Parents: f0745cd
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue May 30 14:35:51 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 14:35:51 2017 -0700

----------------------------------------------------------------------
 .../TransactionMarkerChannelManager.scala       |  90 +++++++++---
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../main/scala/kafka/server/MetadataCache.scala |  26 ++--
 .../scala/kafka/server/ReplicaManager.scala     |   1 -
 .../TransactionMarkerChannelManagerTest.scala   | 146 ++++++++++++-------
 5 files changed, 178 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/80223b14/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 461867d..344863f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -128,10 +128,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue]
 
-  private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
+  private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
 
-  // TODO: What is reasonable for this
-  private val brokerNotAliveBackoffMs = 10
+  private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
 
   private val txnMarkerSendThread: InterBrokerSendThread = {
     new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time)
@@ -156,6 +155,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
   // visible for testing
+  private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker
+
+  // visible for testing
   private[transaction] def senderThread = txnMarkerSendThread
 
   private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry) {
@@ -171,6 +173,22 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
   private[transaction] def drainQueuedTransactionMarkers(): Iterable[RequestAndCompletionHandler] = {
+    val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
+    markersQueueForUnknownBroker.forEachTxnTopicPartition { case (_, queue) =>
+      queue.drainTo(txnIdAndMarkerEntries)
+    }
+
+    for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) {
+      val transactionalId = txnIdAndMarker.txnId
+      val producerId = txnIdAndMarker.txnMarkerEntry.producerId
+      val producerEpoch = txnIdAndMarker.txnMarkerEntry.producerEpoch
+      val txnResult = txnIdAndMarker.txnMarkerEntry.transactionResult
+      val coordinatorEpoch = txnIdAndMarker.txnMarkerEntry.coordinatorEpoch
+      val topicPartitions = txnIdAndMarker.txnMarkerEntry.partitions.asScala.toSet
+
+      addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch, topicPartitions)
+    }
+
     markersQueuePerBroker.map { case (brokerId: Int, brokerRequestQueue: TxnMarkerQueue) =>
       val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
       brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
@@ -254,31 +272,65 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                                  result: TransactionResult, coordinatorEpoch: Int,
                                  topicPartitions: immutable.Set[TopicPartition]): Unit = {
     val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
-    val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
-      var brokerNode: Option[Node] = None
+    val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
+      metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
+    }
 
-      // TODO: instead of retry until succeed, we can first put it into an unknown broker queue and let the sender thread to look for its broker and migrate them
-      while (brokerNode.isEmpty) {
-        brokerNode = metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
+    for ((broker: Option[Node], topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
+      broker match {
+        case Some(brokerNode) =>
+          val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
+          val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
+
+          if (brokerNode == Node.noNode) {
+            // if the leader of the partition is known but node not available, put it into an unknown broker queue
+            // and let the sender thread to look for its broker and migrate them later
+            markersQueueForUnknownBroker.addMarkers(txnTopicPartition, txnIdAndMarker)
+          } else {
+            addMarkersForBroker(brokerNode, txnTopicPartition, txnIdAndMarker)
+          }
 
-        if (brokerNode.isEmpty) {
-          trace(s"Couldn't find leader endpoint for partition: $topicPartition, retrying.")
-          time.sleep(brokerNotAliveBackoffMs)
-        }
-      }
-      brokerNode.get
-    }
+        case None =>
+          txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
+            case Left(error) =>
+              info(s"Encountered $error trying to fetch transaction metadata for $transactionalId with coordinator epoch $coordinatorEpoch; cancel sending markers to its partition leaders")
+              txnMarkerPurgatory.cancelForKey(transactionalId)
 
-    for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
-      val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
-      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
-      addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker)
+            case Right(Some(epochAndMetadata)) =>
+              if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
+                info(s"The cached metadata has changed to $epochAndMetadata (old coordinator epoch is $coordinatorEpoch) since preparing to send markers; cancel sending markers to its partition leaders")
+                txnMarkerPurgatory.cancelForKey(transactionalId)
+              } else {
+                // if the leader of the partition is unknown, skip sending the txn marker since
+                // the partition is likely to be deleted already
+                info(s"Couldn't find leader endpoint for partitions $topicPartitions while trying to send transaction markers for " +
+                  s"$transactionalId, these partitions are likely deleted already and hence can be skipped")
+
+                val txnMetadata = epochAndMetadata.transactionMetadata
+
+                txnMetadata synchronized {
+                  topicPartitions.foreach(txnMetadata.removePartition)
+                }
+
+                txnMarkerPurgatory.checkAndComplete(transactionalId)
+              }
+
+            case Right(None) =>
+              throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
+                s"no metadata in the cache; this is not expected")
+          }
+      }
     }
 
     networkClient.wakeup()
   }
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
+    markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue =>
+      for (entry: TxnIdAndMarkerEntry <- queue.asScala)
+        removeMarkersForTxnId(entry.txnId)
+    }
+
     markersQueuePerBroker.foreach { case(_, brokerQueue) =>
       brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue =>
         for (entry: TxnIdAndMarkerEntry <- queue.asScala)

http://git-wip-us.apache.org/repos/asf/kafka/blob/80223b14/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd6f18d..459cb27 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1558,7 +1558,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
         partitionsToAdd.asScala.partition { tp =>
-          authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
+          authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp)
         }
 
       val unauthorizedForWriteRequestInfo = existingAndAuthorizedForDescribeTopics.filterNot { tp =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/80223b14/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4e1cd37..466645b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -160,20 +160,20 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
   def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
     inReadLock(partitionMetadataLock) {
-      cache.get(topic).flatMap(_.get(partitionId)) match {
-        case Some(partitionInfo) =>
-          val leaderId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-          try {
-            getAliveEndpoint(leaderId, listenerName)
-          } catch {
-            case e: BrokerEndPointNotAvailableException =>
-              None
-          }
-
-        case None =>
-          None
+      cache.get(topic).flatMap(_.get(partitionId)) map { partitionInfo =>
+        val leaderId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+
+        aliveNodes.get(leaderId) match {
+          case Some(nodeMap) =>
+            nodeMap.getOrElse(listenerName, Node.noNode)
+          case None =>
+            Node.noNode
+        }
       }
     }
   }
@@ -235,6 +235,8 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
+
   private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
     cache.get(topic).map { infos =>
       infos.remove(partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/80223b14/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cc5bfb0..5e1c9c1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -340,7 +340,6 @@ class ReplicaManager(val config: KafkaConfig,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     delayedProduceLock: Option[Object] = None) {
-
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,

http://git-wip-us.apache.org/repos/asf/kafka/blob/80223b14/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 991bfbe..4015a4f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -43,13 +43,16 @@ class TransactionMarkerChannelManagerTest {
   private val transactionalId2 = "txnId2"
   private val producerId1 = 0.asInstanceOf[Long]
   private val producerId2 = 1.asInstanceOf[Long]
-  private val producerId3 = 1.asInstanceOf[Long]
   private val producerEpoch = 0.asInstanceOf[Short]
   private val txnTopicPartition1 = 0
   private val txnTopicPartition2 = 1
   private val coordinatorEpoch = 0
   private val txnTimeoutMs = 0
   private val txnResult = TransactionResult.COMMIT
+  private val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerEpoch, txnTimeoutMs,
+    PrepareCommit, mutable.Set[TopicPartition](partition1, partition2), 0L, 0L)
+  private val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch, txnTimeoutMs,
+    PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
 
   private val txnMarkerPurgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("txn-purgatory-name",
     new MockTimer,
@@ -73,6 +76,13 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.expect(txnStateManager.partitionFor(transactionalId2))
       .andReturn(txnTopicPartition2)
       .anyTimes()
+    EasyMock.expect(txnStateManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId1), EasyMock.anyObject[Option[TransactionMetadata]]()))
+      .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
+      .anyTimes()
+    EasyMock.expect(txnStateManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId2), EasyMock.anyObject[Option[TransactionMetadata]]()))
+      .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
+      .anyTimes()
+
     EasyMock.replay(txnStateManager)
   }
 
@@ -82,39 +92,38 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
-  def shouldGenerateRequestPerBroker(): Unit = {
+  def shouldGenerateRequestPerPartitionPerBroker(): Unit = {
     mockCache()
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker1))
-      .anyTimes()
-
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition2.topic),
       EasyMock.eq(partition2.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker2))
-      .anyTimes()
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    val txnMetadata = new TransactionMetadata(transactionalId1, producerId1, producerEpoch, txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1, partition2), 0L, 0L)
-    channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
+    channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds()))
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
-    assertEquals(1, txnMarkerPurgatory.watched)
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
-    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(2, txnMarkerPurgatory.watched)
+    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
     val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)),
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)))).build()
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition2)))).build()
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
 
     val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
       (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@@ -125,59 +134,83 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
-  def shouldGenerateRequestPerPartitionPerBroker(): Unit = {
+  def shouldSkipSendMarkersWhenLeaderNotFound(): Unit = {
     mockCache()
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker1))
-      .anyTimes()
+      EasyMock.anyObject())
+    ).andReturn(None).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerEpoch, txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
-    val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch, txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
     channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds()))
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
-    assertEquals(2, txnMarkerPurgatory.watched)
-    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
-
-    val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition1)),
-        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
-
-    val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
-      (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
-    }.toMap
-
-    assertEquals(Map(broker1 -> expectedBroker1Request), requests)
-    assertTrue(senderThread.generateRequests().isEmpty)
+    assertEquals(1, txnMarkerPurgatory.watched)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
   }
 
   @Test
-  def shouldRetryGettingLeaderWhenNotFound(): Unit = {
+  def shouldSaveForLaterWhenLeaderUnknownButNotAvailable(): Unit = {
     mockCache()
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
       EasyMock.anyObject())
-    ).andReturn(None)
-     .andReturn(None)
-     .andReturn(Some(broker1))
+    ).andReturn(Some(Node.noNode))
+      .andReturn(Some(Node.noNode))
+      .andReturn(Some(Node.noNode))
+      .andReturn(Some(Node.noNode))
+      .andReturn(Some(broker1))
+      .andReturn(Some(broker1))
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    channelManager.addTxnMarkersToBrokerQueue(transactionalId1, producerId1, producerEpoch, TransactionResult.COMMIT, coordinatorEpoch, Set[TopicPartition](partition1))
+    channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds()))
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
-    EasyMock.verify(metadataCache)
+    assertEquals(2, txnMarkerPurgatory.watched)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(2, channelManager.queueForUnknownBroker.totalNumMarkers())
+    assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition1))
+    assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
+
+    val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)),
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)))).build()
+    val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
+
+    val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+      (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
+    }.toMap
+
+    assertEquals(Map(broker2 -> expectedBroker2Request), firstDrainedRequests)
+
+    val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+      (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
+    }.toMap
+
+    assertEquals(Map(broker1 -> expectedBroker1Request), secondDrainedRequests)
   }
 
   @Test
@@ -187,24 +220,26 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker1))
-      .anyTimes()
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerEpoch, txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
     channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds()))
-
-    val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch, txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(2, txnMarkerPurgatory.watched)
     assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
     channelManager.removeMarkersForTxnTopicPartition(txnTopicPartition1)
 
@@ -212,5 +247,8 @@ class TransactionMarkerChannelManagerTest {
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
     assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
   }
 }