You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/03 00:18:08 UTC

kafka git commit: KAFKA-5131; WriteTxnMarkers and complete commit/abort on partition immigration [Forced Update!]

Repository: kafka
Updated Branches:
  refs/heads/trunk da68b1e39 -> 1cd01adcd (forced update)


KAFKA-5131; WriteTxnMarkers and complete commit/abort on partition immigration

Write txn markers and complete the commit/abort for transactions in PrepareXX
state during partition immigration.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2926 from dguy/kafka-5059


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd01adc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd01adc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd01adc

Branch: refs/heads/trunk
Commit: 1cd01adcd194343e252e09769ae43a5c517efa55
Parents: b2fcf73
Author: Damian Guy <da...@gmail.com>
Authored: Wed May 3 01:01:39 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 3 01:17:41 2017 +0100

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 110 ++++++++++---------
 .../transaction/TransactionStateManager.scala   |  22 +++-
 .../TransactionMarkerChannelManagerTest.scala   |   2 +-
 .../TransactionStateManagerTest.scala           |  44 +++++++-
 4 files changed, 121 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 2111a8f..cef25ac 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -253,7 +253,7 @@ class TransactionCoordinator(brokerId: Int,
   }
 
   def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
-      txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
+      txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch, writeTxnMarkers)
   }
 
   def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
@@ -299,6 +299,7 @@ class TransactionCoordinator(brokerId: Int,
       }
   }
 
+
   private def commitOrAbort(transactionalId: String,
                             pid: Long,
                             epoch: Short,
@@ -322,61 +323,66 @@ class TransactionCoordinator(brokerId: Int,
       if (errors == Errors.NONE)
         txnManager.coordinatorEpochFor(transactionalId) match {
           case Some(coordinatorEpoch) =>
-            def completionCallback(error: Errors): Unit = {
-              error match {
-                case Errors.NONE =>
-                  txnManager.getTransactionState(transactionalId) match {
-                    case Some(preparedCommitMetadata) =>
-                      val completedState = if (nextState == PrepareCommit) CompleteCommit else CompleteAbort
-                      val committedMetadata = new TransactionMetadata(pid,
-                        epoch,
-                        preparedCommitMetadata.txnTimeoutMs,
-                        completedState,
-                        preparedCommitMetadata.topicPartitions,
-                        preparedCommitMetadata.transactionStartTime,
-                        time.milliseconds())
-                      preparedCommitMetadata.prepareTransitionTo(completedState)
-
-                      def writeCommittedTransactionCallback(error: Errors): Unit =
-                        error match {
-                          case Errors.NONE =>
-                            trace(s"completed txn for transactionalId: $transactionalId state after commit: ${txnManager.getTransactionState(transactionalId)}")
-                            txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
-                          case Errors.NOT_COORDINATOR =>
-                            // this one should be completed by the new coordinator
-                            warn(s"no longer the coordinator for transactionalId: $transactionalId")
-                          case _ =>
-                            warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
-                            // retry until success
-                            txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
-                        }
-
-                      txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
-                    case None =>
-                      // this one should be completed by the new coordinator
-                      warn(s"no longer the coordinator for transactionalId: $transactionalId")
-                  }
-                case Errors.NOT_COORDINATOR =>
-                  warn(s"no longer the coordinator for transactionalId: $transactionalId")
-                case _ =>
-                  warn(s"error: $error caught when writing transaction markers for transactionalId: $transactionalId. retrying")
-                  txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
-                    newMetadata,
-                    coordinatorEpoch,
-                    completionCallback)
-              }
-            }
-
-            txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), newMetadata, coordinatorEpoch, completionCallback)
+            writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, pid, epoch, nextState, newMetadata, coordinatorEpoch))
           case None =>
             // this one should be completed by the new coordinator
             warn(s"no longer the coordinator for transactionalId: $transactionalId")
         }
     }
-
     txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
   }
 
+
+  private def writeTxnMarkers(transactionMarkersArgs: WriteTxnMarkerArgs) = {
+    def completionCallback(error: Errors): Unit = {
+      error match {
+        case Errors.NONE =>
+          txnManager.getTransactionState(transactionMarkersArgs.transactionalId) match {
+            case Some(preparedCommitMetadata) =>
+              val completedState = if (transactionMarkersArgs.nextState == PrepareCommit) CompleteCommit else CompleteAbort
+              val committedMetadata = new TransactionMetadata(transactionMarkersArgs.pid,
+                transactionMarkersArgs.epoch,
+                preparedCommitMetadata.txnTimeoutMs,
+                completedState,
+                preparedCommitMetadata.topicPartitions,
+                preparedCommitMetadata.transactionStartTime,
+                time.milliseconds())
+              preparedCommitMetadata.prepareTransitionTo(completedState)
+
+              def writeCommittedTransactionCallback(error: Errors): Unit = {
+                error match {
+                  case Errors.NONE =>
+                    txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionMarkersArgs.transactionalId), transactionMarkersArgs.pid)
+                  case Errors.NOT_COORDINATOR =>
+                    // this one should be completed by the new coordinator
+                    warn(s"no longer the coordinator for transactionalId: ${transactionMarkersArgs.transactionalId}")
+                  case _ =>
+                    warn(s"error: $error caught for transactionalId: ${transactionMarkersArgs.transactionalId} when appending state: $completedState. retrying")
+                    // retry until success
+                    txnManager.appendTransactionToLog(transactionMarkersArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
+                }
+              }
+              txnManager.appendTransactionToLog(transactionMarkersArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
+            case None =>
+              // this one should be completed by the new coordinator
+              warn(s"no longer the coordinator for transactionalId: ${transactionMarkersArgs.transactionalId}")
+          }
+        case Errors.NOT_COORDINATOR =>
+          warn(s"no longer the coordinator for transactionalId: ${transactionMarkersArgs.transactionalId}")
+        case _ =>
+          warn(s"error: $error caught when writing transaction markers for transactionalId: ${transactionMarkersArgs.transactionalId}. retrying")
+          txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionMarkersArgs.transactionalId),
+            transactionMarkersArgs.newMetadata,
+            transactionMarkersArgs.coordinatorEpoch,
+            completionCallback)
+      }
+    }
+    txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionMarkersArgs.transactionalId),
+      transactionMarkersArgs.newMetadata,
+      transactionMarkersArgs.coordinatorEpoch,
+      completionCallback)
+  }
+
   def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
 
   def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
@@ -408,4 +414,10 @@ class TransactionCoordinator(brokerId: Int,
   }
 }
 
-case class InitPidResult(pid: Long, epoch: Short, error: Errors)
\ No newline at end of file
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)
+case class WriteTxnMarkerArgs(transactionalId: String,
+                              pid: Long,
+                              epoch: Short,
+                              nextState: TransactionState,
+                              newMetadata: TransactionMetadata,
+                              coordinatorEpoch: Int)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2e40a34..facd4b6 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.common.{KafkaException, Topic}
 import kafka.log.LogConfig
@@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 
-import scala.collection.{concurrent, mutable}
+import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 
@@ -60,6 +60,8 @@ class TransactionStateManager(brokerId: Int,
 
   this.logIdent = "[Transaction Log Manager " + brokerId + "]: "
 
+  type WriteTxnMarkers = WriteTxnMarkerArgs => Unit
+
   /** shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
 
@@ -147,7 +149,7 @@ class TransactionStateManager(brokerId: Int,
     zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions)
   }
 
-  private def loadTransactionMetadata(topicPartition: TopicPartition) {
+  private def loadTransactionMetadata(topicPartition: TopicPartition, writeTxnMarkers: WriteTxnMarkers) {
     def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
 
     val startMs = time.milliseconds()
@@ -207,6 +209,16 @@ class TransactionStateManager(brokerId: Int,
 
                 throw new KafkaException("Loading transaction topic partition failed.")
               }
+              // if state is PrepareCommit or PrepareAbort we need to complete the transaction
+              if (currentTxnMetadata.state == PrepareCommit || currentTxnMetadata.state == PrepareAbort) {
+                writeTxnMarkers(WriteTxnMarkerArgs(transactionalId,
+                  txnMetadata.pid,
+                  txnMetadata.producerEpoch,
+                  txnMetadata.state,
+                  txnMetadata,
+                  coordinatorEpochFor(transactionalId).get
+                ))
+              }
           }
 
           removedTransactionalIds.foreach { transactionalId =>
@@ -229,7 +241,7 @@ class TransactionStateManager(brokerId: Int,
    * When this broker becomes a leader for a transaction log partition, load this partition and
    * populate the transaction metadata cache with the transactional ids.
    */
-  def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int) {
+  def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int, writeTxnMarkers: WriteTxnMarkers) {
     validateTransactionTopicPartitionCountIsStable()
 
     val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partition)
@@ -242,7 +254,7 @@ class TransactionStateManager(brokerId: Int,
     def loadTransactions() {
       info(s"Loading transaction metadata from $topicPartition")
       try {
-        loadTransactionMetadata(topicPartition)
+        loadTransactionMetadata(topicPartition, writeTxnMarkers)
       } catch {
         case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t)
       } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 1c49151..29240a6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -17,7 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.api.{LeaderAndIsr, PartitionStateInfo}
-import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, InterBrokerSendThread}
+import kafka.common.{BrokerEndPointNotAvailableException, InterBrokerSendThread}
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.{MockTime, TestUtils}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 2edcb8f..708d038 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -156,7 +156,7 @@ class TransactionStateManagerTest {
     assertFalse(transactionManager.isCoordinatorFor(txnId1))
     assertFalse(transactionManager.isCoordinatorFor(txnId2))
 
-    transactionManager.loadTransactionsForPartition(partitionId, 0)
+    transactionManager.loadTransactionsForPartition(partitionId, 0, writeTxnMarkersCallback)
 
     // let the time advance to trigger the background thread loading
     scheduler.tick()
@@ -293,7 +293,7 @@ class TransactionStateManagerTest {
     val coordinatorEpoch = 10
     EasyMock.expect(replicaManager.getLog(EasyMock.anyObject(classOf[TopicPartition]))).andReturn(None)
     EasyMock.replay(replicaManager)
-    transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch)
+    transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch, writeTxnMarkersCallback)
     val epoch = transactionManager.coordinatorEpochFor(txnId1).get
     assertEquals(coordinatorEpoch, epoch)
   }
@@ -303,6 +303,39 @@ class TransactionStateManagerTest {
     assertEquals(None, transactionManager.coordinatorEpochFor(txnId1))
   }
 
+  @Test
+  def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
+    verifyWritesTxnMarkersInPrepareState(PrepareCommit)
+  }
+
+  @Test
+  def shouldWriteTxnMarkersForTransactionInPreparedAbortState(): Unit = {
+    verifyWritesTxnMarkersInPrepareState(PrepareAbort)
+  }
+
+  private def verifyWritesTxnMarkersInPrepareState(state: TransactionState): Unit = {
+    txnMetadata1.state = state
+    txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
+      new TopicPartition("topic1", 1)))
+
+    txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1))
+    val startOffset = 0L
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*)
+
+    prepareTxnLog(topicPartition, 0, records)
+
+    var receivedArgs: WriteTxnMarkerArgs = null
+
+    def callback(writeTxnMarkerArgs: WriteTxnMarkerArgs): Unit = {
+      receivedArgs = writeTxnMarkerArgs
+    }
+
+    transactionManager.loadTransactionsForPartition(partitionId, 0, callback)
+    scheduler.tick()
+
+    assertEquals(txnId1, receivedArgs.transactionalId)
+  }
+
   private def assertCallback(error: Errors): Unit = {
     assertEquals(expectedError, error)
   }
@@ -351,4 +384,11 @@ class TransactionStateManagerTest {
 
     EasyMock.replay(replicaManager)
   }
+
+  def writeTxnMarkersCallback(writeTxnMarkerArgs: WriteTxnMarkerArgs): Unit = {
+
+
+  }
+
+
 }