You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/06/03 17:38:24 UTC

[kafka] branch trunk updated: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends (#8782)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ffbc6e  KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends (#8782)
0ffbc6e is described below

commit 0ffbc6e75fffb39d0dc387db534a51ce613f52af
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Jun 3 10:37:53 2020 -0700

    KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends (#8782)
    
    The method `maybeWriteTxnCompletion` is unsafe for concurrent calls. This can cause duplicate attempts to write the completion record to the log, which can ultimately lead to illegal state errors and possible to correctness violations if another transaction had been started before the duplicate was written. This patch fixes the problem by ensuring only one thread can successfully remove the pending completion from the map.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
 .../TransactionMarkerChannelManager.scala          | 43 +++++------
 ...TransactionMarkerRequestCompletionHandler.scala |  2 +-
 .../TransactionMarkerChannelManagerTest.scala      | 87 +++++++++++++++++++---
 ...sactionMarkerRequestCompletionHandlerTest.scala |  2 +-
 4 files changed, 98 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index dd8bb98..6fe2575 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -214,13 +214,11 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   }
 
-  private def writeTxnCompletion(pendingCommitTxn: PendingCompleteTxn): Unit = {
-    transactionsWithPendingMarkers.remove(pendingCommitTxn.transactionalId)
-
-    val transactionalId = pendingCommitTxn.transactionalId
-    val txnMetadata = pendingCommitTxn.txnMetadata
-    val newMetadata = pendingCommitTxn.newMetadata
-    val coordinatorEpoch = pendingCommitTxn.coordinatorEpoch
+  private def writeTxnCompletion(pendingCompleteTxn: PendingCompleteTxn): Unit = {
+    val transactionalId = pendingCompleteTxn.transactionalId
+    val txnMetadata = pendingCompleteTxn.txnMetadata
+    val newMetadata = pendingCompleteTxn.newMetadata
+    val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
 
     trace(s"Completed sending transaction markers for $transactionalId; begin transition " +
       s"to ${newMetadata.txnState}")
@@ -242,7 +240,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
         if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
           debug(s"Sending $transactionalId's transaction markers for $txnMetadata with " +
             s"coordinator epoch $coordinatorEpoch succeeded, trying to append complete transaction log now")
-
           tryAppendToLog(PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, newMetadata))
         } else {
           info(s"The cached metadata $txnMetadata has changed to $epochAndMetadata after " +
@@ -263,15 +260,13 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                           txnMetadata: TransactionMetadata,
                           newMetadata: TxnTransitMetadata): Unit = {
     val transactionalId = txnMetadata.transactionalId
-
-    val pendingCommitTxn = PendingCompleteTxn(
+    val pendingCompleteTxn = PendingCompleteTxn(
       transactionalId,
       coordinatorEpoch,
       txnMetadata,
-      newMetadata
-    )
+      newMetadata)
 
-    transactionsWithPendingMarkers.put(transactionalId, pendingCommitTxn)
+    transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn)
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId,
       txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
     maybeWriteTxnCompletion(transactionalId)
@@ -285,15 +280,16 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   }
 
-  private def maybeWriteTxnCompletion(transactionalId: String): Unit = {
-    Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { pendingCommitTxn =>
-      if (!hasPendingMarkersToWrite(pendingCommitTxn.txnMetadata)) {
-        writeTxnCompletion(pendingCommitTxn)
+  def maybeWriteTxnCompletion(transactionalId: String): Unit = {
+    Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { pendingCompleteTxn =>
+      if (!hasPendingMarkersToWrite(pendingCompleteTxn.txnMetadata) &&
+          transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn)) {
+        writeTxnCompletion(pendingCompleteTxn)
       }
     }
   }
 
-  private def tryAppendToLog(txnLogAppend: PendingCompleteTxn) = {
+  private def tryAppendToLog(txnLogAppend: PendingCompleteTxn): Unit = {
     // try to append to the transaction log
     def appendCallback(error: Errors): Unit =
       error match {
@@ -404,18 +400,17 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   def removeMarkersForTxnId(transactionalId: String): Unit = {
     transactionsWithPendingMarkers.remove(transactionalId)
   }
-
-  def completeSendMarkersForTxnId(transactionalId: String): Unit = {
-    maybeWriteTxnCompletion(transactionalId)
-  }
 }
 
 case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry)
 
-case class PendingCompleteTxn(transactionalId: String, coordinatorEpoch: Int, txnMetadata: TransactionMetadata, newMetadata: TxnTransitMetadata) {
+case class PendingCompleteTxn(transactionalId: String,
+                              coordinatorEpoch: Int,
+                              txnMetadata: TransactionMetadata,
+                              newMetadata: TxnTransitMetadata) {
 
   override def toString: String = {
-    "TxnLogAppend(" +
+    "PendingCompleteTxn(" +
       s"transactionalId=$transactionalId, " +
       s"coordinatorEpoch=$coordinatorEpoch, " +
       s"txnMetadata=$txnMetadata, " +
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 263e155..66edc47 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -193,7 +193,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                   txnMarker.coordinatorEpoch,
                   retryPartitions.toSet)
               } else {
-                txnMarkerChannelManager.completeSendMarkersForTxnId(transactionalId)
+                txnMarkerChannelManager.maybeWriteTxnCompletion(transactionalId)
               }
             }
         }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 46d9c9d..f01caa7 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -16,7 +16,10 @@
  */
 package kafka.coordinator.transaction
 
+import java.util
 import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.{Callable, Executors, Future}
 
 import kafka.common.RequestAndCompletionHandler
 import kafka.metrics.KafkaYammerMetrics
@@ -34,11 +37,12 @@ import org.junit.Test
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
+import scala.util.Try
 
 class TransactionMarkerChannelManagerTest {
   private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
   private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
-  private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+  private val txnStateManager: TransactionStateManager = EasyMock.mock(classOf[TransactionStateManager])
 
   private val partition1 = new TopicPartition("topic1", 0)
   private val partition2 = new TopicPartition("topic1", 1)
@@ -87,6 +91,70 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
+  def shouldOnlyWriteTxnCompletionOnce(): Unit = {
+    mockCache()
+
+    val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition1.topic),
+      EasyMock.eq(partition1.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+
+    EasyMock.expect(txnStateManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId2),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(expectedTransition),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
+      .andAnswer(() => {
+        txnMetadata2.completeTransitionTo(expectedTransition)
+        capturedErrorsCallback.getValue.apply(Errors.NONE)
+      }).once()
+
+    EasyMock.replay(txnStateManager, metadataCache)
+
+    var addMarkerFuture: Future[Try[Unit]] = null
+    val executor = Executors.newFixedThreadPool(1)
+    txnMetadata2.lock.lock()
+    try {
+      addMarkerFuture = executor.submit((() => {
+        Try(channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+            txnMetadata2, expectedTransition))
+      }): Callable[Try[Unit]])
+
+      val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+      val response = new WriteTxnMarkersResponse(
+        Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE)))
+      val clientResponse = new ClientResponse(header, null, null,
+        time.milliseconds(), time.milliseconds(), false, null, null,
+        response)
+
+      TestUtils.waitUntilTrue(() => {
+        val requests = channelManager.drainQueuedTransactionMarkers()
+        if (requests.nonEmpty) {
+          assertEquals(1, requests.size)
+          val request = requests.head
+          request.handler.onComplete(clientResponse)
+          true
+        } else {
+          false
+        }
+      }, "Timed out waiting for expected WriteTxnMarkers request")
+    } finally {
+      txnMetadata2.lock.unlock()
+      executor.shutdown()
+    }
+
+    assertNotNull(addMarkerFuture)
+    assertTrue("Add marker task failed with exception " + addMarkerFuture.get().get,
+      addMarkerFuture.get().isSuccess)
+
+    EasyMock.verify(txnStateManager)
+  }
+
+  @Test
   def shouldGenerateEmptyMapWhenNoRequestsOutstanding(): Unit = {
     assertTrue(channelManager.generateRequests().isEmpty)
   }
@@ -153,7 +221,6 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.replay(metadataCache)
 
     channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds()))
-    channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(1, channelManager.numTxnsWithPendingMarkers)
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
@@ -291,7 +358,7 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1),
         null, null, 0, 0, false, null, null, response))
     }
 
@@ -338,7 +405,7 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1),
         null, null, 0, 0, false, null, null, response))
     }
 
@@ -387,7 +454,7 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1),
         null, null, 0, 0, false, null, null, response))
     }
 
@@ -402,7 +469,7 @@ class TransactionMarkerChannelManagerTest {
     assertEquals(CompleteCommit, txnMetadata2.state)
   }
 
-  private def createPidErrorMap(errors: Errors) = {
+  private def createPidErrorMap(errors: Errors): util.HashMap[java.lang.Long, util.Map[TopicPartition, Errors]] = {
     val pidMap = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
     val errorsMap = new java.util.HashMap[TopicPartition, Errors]()
     errorsMap.put(partition1, errors)
@@ -414,11 +481,11 @@ class TransactionMarkerChannelManagerTest {
   def shouldCreateMetricsOnStarting(): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
 
-    assertEquals(1, metrics.filter { case (k, _) =>
+    assertEquals(1, metrics.count { case (k, _) =>
       k.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize"
-    }.size)
-    assertEquals(1, metrics.filter { case (k, _) =>
+    })
+    assertEquals(1, metrics.count { case (k, _) =>
       k.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize"
-    }.size)
+    })
   }
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 15b7a9e..5ae961b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -233,7 +233,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private def verifyCompleteDelayedOperationOnError(error: Errors): Unit = {
 
     var completed = false
-    EasyMock.expect(markerChannelManager.completeSendMarkersForTxnId(transactionalId))
+    EasyMock.expect(markerChannelManager.maybeWriteTxnCompletion(transactionalId))
       .andAnswer(() => completed = true)
       .once()
     EasyMock.replay(markerChannelManager)