You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/03 00:18:08 UTC
kafka git commit: KAFKA-5131;
WriteTxnMarkers and complete commit/abort on partition immigration
[Forced Update!]
Repository: kafka
Updated Branches:
refs/heads/trunk da68b1e39 -> 1cd01adcd (forced update)
KAFKA-5131; WriteTxnMarkers and complete commit/abort on partition immigration
Write txn markers and complete the commit/abort for transactions in PrepareXX
state during partition immigration.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2926 from dguy/kafka-5059
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd01adc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd01adc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd01adc
Branch: refs/heads/trunk
Commit: 1cd01adcd194343e252e09769ae43a5c517efa55
Parents: b2fcf73
Author: Damian Guy <da...@gmail.com>
Authored: Wed May 3 01:01:39 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 3 01:17:41 2017 +0100
----------------------------------------------------------------------
.../transaction/TransactionCoordinator.scala | 110 ++++++++++---------
.../transaction/TransactionStateManager.scala | 22 +++-
.../TransactionMarkerChannelManagerTest.scala | 2 +-
.../TransactionStateManagerTest.scala | 44 +++++++-
4 files changed, 121 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 2111a8f..cef25ac 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -253,7 +253,7 @@ class TransactionCoordinator(brokerId: Int,
}
def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
- txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
+ txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch, writeTxnMarkers)
}
def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
@@ -299,6 +299,7 @@ class TransactionCoordinator(brokerId: Int,
}
}
+
private def commitOrAbort(transactionalId: String,
pid: Long,
epoch: Short,
@@ -322,61 +323,66 @@ class TransactionCoordinator(brokerId: Int,
if (errors == Errors.NONE)
txnManager.coordinatorEpochFor(transactionalId) match {
case Some(coordinatorEpoch) =>
- def completionCallback(error: Errors): Unit = {
- error match {
- case Errors.NONE =>
- txnManager.getTransactionState(transactionalId) match {
- case Some(preparedCommitMetadata) =>
- val completedState = if (nextState == PrepareCommit) CompleteCommit else CompleteAbort
- val committedMetadata = new TransactionMetadata(pid,
- epoch,
- preparedCommitMetadata.txnTimeoutMs,
- completedState,
- preparedCommitMetadata.topicPartitions,
- preparedCommitMetadata.transactionStartTime,
- time.milliseconds())
- preparedCommitMetadata.prepareTransitionTo(completedState)
-
- def writeCommittedTransactionCallback(error: Errors): Unit =
- error match {
- case Errors.NONE =>
- trace(s"completed txn for transactionalId: $transactionalId state after commit: ${txnManager.getTransactionState(transactionalId)}")
- txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
- case Errors.NOT_COORDINATOR =>
- // this one should be completed by the new coordinator
- warn(s"no longer the coordinator for transactionalId: $transactionalId")
- case _ =>
- warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
- // retry until success
- txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
- }
-
- txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
- case None =>
- // this one should be completed by the new coordinator
- warn(s"no longer the coordinator for transactionalId: $transactionalId")
- }
- case Errors.NOT_COORDINATOR =>
- warn(s"no longer the coordinator for transactionalId: $transactionalId")
- case _ =>
- warn(s"error: $error caught when writing transaction markers for transactionalId: $transactionalId. retrying")
- txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
- newMetadata,
- coordinatorEpoch,
- completionCallback)
- }
- }
-
- txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), newMetadata, coordinatorEpoch, completionCallback)
+ writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, pid, epoch, nextState, newMetadata, coordinatorEpoch))
case None =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: $transactionalId")
}
}
-
txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
}
+
+ private def writeTxnMarkers(transactionMarkersArgs: WriteTxnMarkerArgs) = {
+ def completionCallback(error: Errors): Unit = {
+ error match {
+ case Errors.NONE =>
+ txnManager.getTransactionState(transactionMarkersArgs.transactionalId) match {
+ case Some(preparedCommitMetadata) =>
+ val completedState = if (transactionMarkersArgs.nextState == PrepareCommit) CompleteCommit else CompleteAbort
+ val committedMetadata = new TransactionMetadata(transactionMarkersArgs.pid,
+ transactionMarkersArgs.epoch,
+ preparedCommitMetadata.txnTimeoutMs,
+ completedState,
+ preparedCommitMetadata.topicPartitions,
+ preparedCommitMetadata.transactionStartTime,
+ time.milliseconds())
+ preparedCommitMetadata.prepareTransitionTo(completedState)
+
+ def writeCommittedTransactionCallback(error: Errors): Unit = {
+ error match {
+ case Errors.NONE =>
+ txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionMarkersArgs.transactionalId), transactionMarkersArgs.pid)
+ case Errors.NOT_COORDINATOR =>
+ // this one should be completed by the new coordinator
+ warn(s"no longer the coordinator for transactionalId: ${transactionMarkersArgs.transactionalId}")
+ case _ =>
+ warn(s"error: $error caught for transactionalId: ${transactionMarkersArgs.transactionalId} when appending state: $completedState. retrying")
+ // retry until success
+ txnManager.appendTransactionToLog(transactionMarkersArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
+ }
+ }
+ txnManager.appendTransactionToLog(transactionMarkersArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
+ case None =>
+ // this one should be completed by the new coordinator
+ warn(s"no longer the coordinator for transactionalId: ${transactionMarkersArgs.transactionalId}")
+ }
+ case Errors.NOT_COORDINATOR =>
+ warn(s"no longer the coordinator for transactionalId: ${transactionMarkersArgs.transactionalId}")
+ case _ =>
+ warn(s"error: $error caught when writing transaction markers for transactionalId: ${transactionMarkersArgs.transactionalId}. retrying")
+ txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionMarkersArgs.transactionalId),
+ transactionMarkersArgs.newMetadata,
+ transactionMarkersArgs.coordinatorEpoch,
+ completionCallback)
+ }
+ }
+ txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionMarkersArgs.transactionalId),
+ transactionMarkersArgs.newMetadata,
+ transactionMarkersArgs.coordinatorEpoch,
+ completionCallback)
+ }
+
def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
@@ -408,4 +414,10 @@ class TransactionCoordinator(brokerId: Int,
}
}
-case class InitPidResult(pid: Long, epoch: Short, error: Errors)
\ No newline at end of file
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)
+case class WriteTxnMarkerArgs(transactionalId: String,
+ pid: Long,
+ epoch: Short,
+ nextState: TransactionState,
+ newMetadata: TransactionMetadata,
+ coordinatorEpoch: Int)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2e40a34..facd4b6 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.ReentrantLock
import kafka.common.{KafkaException, Topic}
import kafka.log.LogConfig
@@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
-import scala.collection.{concurrent, mutable}
+import scala.collection.mutable
import scala.collection.JavaConverters._
@@ -60,6 +60,8 @@ class TransactionStateManager(brokerId: Int,
this.logIdent = "[Transaction Log Manager " + brokerId + "]: "
+ type WriteTxnMarkers = WriteTxnMarkerArgs => Unit
+
/** shutting down flag */
private val shuttingDown = new AtomicBoolean(false)
@@ -147,7 +149,7 @@ class TransactionStateManager(brokerId: Int,
zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions)
}
- private def loadTransactionMetadata(topicPartition: TopicPartition) {
+ private def loadTransactionMetadata(topicPartition: TopicPartition, writeTxnMarkers: WriteTxnMarkers) {
def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
val startMs = time.milliseconds()
@@ -207,6 +209,16 @@ class TransactionStateManager(brokerId: Int,
throw new KafkaException("Loading transaction topic partition failed.")
}
+ // if state is PrepareCommit or PrepareAbort we need to complete the transaction
+ if (currentTxnMetadata.state == PrepareCommit || currentTxnMetadata.state == PrepareAbort) {
+ writeTxnMarkers(WriteTxnMarkerArgs(transactionalId,
+ txnMetadata.pid,
+ txnMetadata.producerEpoch,
+ txnMetadata.state,
+ txnMetadata,
+ coordinatorEpochFor(transactionalId).get
+ ))
+ }
}
removedTransactionalIds.foreach { transactionalId =>
@@ -229,7 +241,7 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load this partition and
* populate the transaction metadata cache with the transactional ids.
*/
- def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int) {
+ def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int, writeTxnMarkers: WriteTxnMarkers) {
validateTransactionTopicPartitionCountIsStable()
val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partition)
@@ -242,7 +254,7 @@ class TransactionStateManager(brokerId: Int,
def loadTransactions() {
info(s"Loading transaction metadata from $topicPartition")
try {
- loadTransactionMetadata(topicPartition)
+ loadTransactionMetadata(topicPartition, writeTxnMarkers)
} catch {
case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t)
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 1c49151..29240a6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.transaction
import kafka.api.{LeaderAndIsr, PartitionStateInfo}
-import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, InterBrokerSendThread}
+import kafka.common.{BrokerEndPointNotAvailableException, InterBrokerSendThread}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.{MockTime, TestUtils}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd01adc/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 2edcb8f..708d038 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -156,7 +156,7 @@ class TransactionStateManagerTest {
assertFalse(transactionManager.isCoordinatorFor(txnId1))
assertFalse(transactionManager.isCoordinatorFor(txnId2))
- transactionManager.loadTransactionsForPartition(partitionId, 0)
+ transactionManager.loadTransactionsForPartition(partitionId, 0, writeTxnMarkersCallback)
// let the time advance to trigger the background thread loading
scheduler.tick()
@@ -293,7 +293,7 @@ class TransactionStateManagerTest {
val coordinatorEpoch = 10
EasyMock.expect(replicaManager.getLog(EasyMock.anyObject(classOf[TopicPartition]))).andReturn(None)
EasyMock.replay(replicaManager)
- transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch)
+ transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch, writeTxnMarkersCallback)
val epoch = transactionManager.coordinatorEpochFor(txnId1).get
assertEquals(coordinatorEpoch, epoch)
}
@@ -303,6 +303,39 @@ class TransactionStateManagerTest {
assertEquals(None, transactionManager.coordinatorEpochFor(txnId1))
}
+ @Test
+ def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
+ verifyWritesTxnMarkersInPrepareState(PrepareCommit)
+ }
+
+ @Test
+ def shouldWriteTxnMarkersForTransactionInPreparedAbortState(): Unit = {
+ verifyWritesTxnMarkersInPrepareState(PrepareAbort)
+ }
+
+ private def verifyWritesTxnMarkersInPrepareState(state: TransactionState): Unit = {
+ txnMetadata1.state = state
+ txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)))
+
+ txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1))
+ val startOffset = 0L
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*)
+
+ prepareTxnLog(topicPartition, 0, records)
+
+ var receivedArgs: WriteTxnMarkerArgs = null
+
+ def callback(writeTxnMarkerArgs: WriteTxnMarkerArgs): Unit = {
+ receivedArgs = writeTxnMarkerArgs
+ }
+
+ transactionManager.loadTransactionsForPartition(partitionId, 0, callback)
+ scheduler.tick()
+
+ assertEquals(txnId1, receivedArgs.transactionalId)
+ }
+
private def assertCallback(error: Errors): Unit = {
assertEquals(expectedError, error)
}
@@ -351,4 +384,11 @@ class TransactionStateManagerTest {
EasyMock.replay(replicaManager)
}
+
+ def writeTxnMarkersCallback(writeTxnMarkerArgs: WriteTxnMarkerArgs): Unit = {
+
+
+ }
+
+
}