You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/12 17:36:04 UTC
kafka git commit: KAFKA-5132: abort long running transactions
Repository: kafka
Updated Branches:
refs/heads/trunk 766dea94e -> 495184916
KAFKA-5132: abort long running transactions
Abort any ongoing transactions that haven't been touched for longer than the transaction timeout
Author: Damian Guy <da...@gmail.com>
Reviewers: Jason Gustafson, Apurva Mehta, Ismael Juma, Guozhang Wang
Closes #2957 from dguy/kafka-5132
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49518491
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49518491
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49518491
Branch: refs/heads/trunk
Commit: 4951849163b1defea91129472b5354531407deb9
Parents: 766dea9
Author: Damian Guy <da...@gmail.com>
Authored: Fri May 12 10:36:02 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 10:36:02 2017 -0700
----------------------------------------------------------------------
.../transaction/TransactionCoordinator.scala | 43 ++++++++++++-
.../transaction/TransactionMarkerChannel.scala | 3 +-
.../TransactionMarkerChannelManager.scala | 2 +-
.../transaction/TransactionMetadata.scala | 4 +-
.../transaction/TransactionStateManager.scala | 25 ++++++--
.../main/scala/kafka/server/KafkaConfig.scala | 6 ++
.../TransactionCoordinatorTest.scala | 66 ++++++++++++++++++++
.../TransactionStateManagerTest.scala | 32 ++++++++++
8 files changed, 170 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 38e725f..982e009 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -44,14 +44,15 @@ object TransactionCoordinator {
config.transactionTopicReplicationFactor,
config.transactionTopicSegmentBytes,
config.transactionsLoadBufferSize,
- config.transactionTopicMinISR)
+ config.transactionTopicMinISR,
+ config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
val logManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId)
val transactionMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnMarkerPurgatory, time)
- new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, time)
+ new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, scheduler, time)
}
private def initTransactionError(error: Errors): InitPidResult = {
@@ -76,6 +77,7 @@ class TransactionCoordinator(brokerId: Int,
txnManager: TransactionStateManager,
txnMarkerChannelManager: TransactionMarkerChannelManager,
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
+ scheduler: Scheduler,
time: Time) extends Logging {
this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
@@ -383,11 +385,45 @@ class TransactionCoordinator(brokerId: Int,
def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
+ private def expireTransactions(): Unit = {
+
+ txnManager.transactionsToExpire().foreach{ idAndMetadata =>
+ idAndMetadata.metadata synchronized {
+ if (!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId)
+ && idAndMetadata.metadata.pendingState.isEmpty) {
+ // bump the producerEpoch so that any further requests for this transactionalId will be fenced
+ idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort
+ idAndMetadata.metadata.prepareTransitionTo(Ongoing)
+ txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => {
+ if (errors != Errors.NONE)
+ warn(s"failed to append transactionalId ${idAndMetadata.transactionalId} to log during transaction expiry. errors:$errors")
+ else
+ handleEndTransaction(idAndMetadata.transactionalId,
+ idAndMetadata.metadata.pid,
+ idAndMetadata.metadata.producerEpoch,
+ TransactionResult.ABORT,
+ (errors: Errors) => {
+ if (errors != Errors.NONE)
+ warn(s"rollback of transactionalId: ${idAndMetadata.transactionalId} failed during transaction expiry. errors: $errors")
+ }
+ )
+ })
+ }
+ }
+ }
+ }
+
/**
* Startup logic executed at the same time when the server starts up.
*/
def startup(enablePidExpiration: Boolean = true) {
info("Starting up.")
+ scheduler.startup()
+ scheduler.schedule("transaction-expiration",
+ expireTransactions,
+ TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs,
+ TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+ )
if (enablePidExpiration)
txnManager.enablePidExpiration()
txnMarkerChannelManager.start()
@@ -403,10 +439,11 @@ class TransactionCoordinator(brokerId: Int,
def shutdown() {
info("Shutting down.")
isActive.set(false)
+ scheduler.shutdown()
+ txnMarkerPurgatory.shutdown()
pidManager.shutdown()
txnManager.shutdown()
txnMarkerChannelManager.shutdown()
- txnMarkerPurgatory.shutdown()
info("Shutdown complete.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
index cad3ea5..e60bd40 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
@@ -166,9 +166,10 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
pendingTxnMap.get(PendingTxnKey(metadataPartition, pid))
}
- def clear(): Unit = {
+ def close(): Unit = {
brokerStateMap.clear()
pendingTxnMap.clear()
+ networkClient.close()
}
def removeStateForPartition(partition: Int): mutable.Iterable[Long] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 2c17564..1b7ea56 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -107,7 +107,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
def shutdown(): Unit = {
interBrokerSendThread.shutdown()
- transactionMarkerChannel.clear()
+ transactionMarkerChannel.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d84e054..a81e47b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -161,8 +161,8 @@ private[coordinator] class TransactionMetadata(val pid: Long,
txnTimeoutMs == other.txnTimeoutMs &&
state.equals(other.state) &&
topicPartitions.equals(other.topicPartitions) &&
- transactionStartTime.equals(other.transactionStartTime) &&
- lastUpdateTimestamp.equals(other.lastUpdateTimestamp)
+ transactionStartTime == other.transactionStartTime &&
+ lastUpdateTimestamp == other.lastUpdateTimestamp
case _ => false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e23324f..f5dc3c0 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -43,6 +43,7 @@ object TransactionManager {
// default transaction management config values
val DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7).toInt
val DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15).toInt
+ val DefaultRemoveExpiredTransactionsIntervalMs = TimeUnit.MINUTES.toMillis(1).toInt
}
/**
@@ -82,9 +83,9 @@ class TransactionStateManager(brokerId: Int,
private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
def enablePidExpiration() {
- scheduler.startup()
-
- // TODO: add transaction and pid expiration logic
+ if (!scheduler.isStarted)
+ scheduler.startup()
+ // TODO: add pid expiration logic
}
/**
@@ -142,6 +143,19 @@ class TransactionStateManager(brokerId: Int,
loadingPartitions.contains(partitionId)
}
+
+ def transactionsToExpire(): Iterable[TransactionalIdAndMetadata] = {
+ val now = time.milliseconds()
+ transactionMetadataCache.filter { case (_, metadata) =>
+ metadata.state match {
+ case Ongoing =>
+ metadata.transactionStartTime + metadata.txnTimeoutMs < now
+ case _ => false
+ }
+ }.map {case (id, metadata) =>
+ TransactionalIdAndMetadata(id, metadata)
+ }
+ }
/**
* Gets the partition count of the transaction log topic from ZooKeeper.
* If the topic does not exist, the default partition count is returned.
@@ -445,4 +459,7 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I
transactionLogReplicationFactor: Short = TransactionLog.DefaultReplicationFactor,
transactionLogSegmentBytes: Int = TransactionLog.DefaultSegmentBytes,
transactionLogLoadBufferSize: Int = TransactionLog.DefaultLoadBufferSize,
- transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas)
+ transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas,
+ removeExpiredTransactionsIntervalMs: Int = TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+
+case class TransactionalIdAndMetadata(transactionalId: String, metadata: TransactionMetadata)
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 708201a..76f6380 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -165,6 +165,7 @@ object Defaults {
val TransactionsTopicReplicationFactor = TransactionLog.DefaultReplicationFactor
val TransactionsTopicPartitions = TransactionLog.DefaultNumPartitions
val TransactionsTopicSegmentBytes = TransactionLog.DefaultSegmentBytes
+ val TransactionsExpiredTransactionCleanupIntervalMS = TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
@@ -349,6 +350,8 @@ object KafkaConfig {
val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor"
+ val TransactionsExpiredTransactionCleanupIntervalMsProp = "transaction.expired.transaction.cleanup.interval.ms"
+
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
@@ -594,6 +597,7 @@ object KafkaConfig {
"Internal topic creation will fail until the cluster size meets this replication factor requirement."
val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
+ val TransactionsExpiredTransactionCleanupIntervalMsDoc = "The interval at which to rollback expired transactions"
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " +
@@ -799,6 +803,7 @@ object KafkaConfig {
.define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TransactionsTopicReplicationFactor, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc)
.define(TransactionsTopicPartitionsProp, INT, Defaults.TransactionsTopicPartitions, atLeast(1), HIGH, TransactionsTopicPartitionsDoc)
.define(TransactionsTopicSegmentBytesProp, INT, Defaults.TransactionsTopicSegmentBytes, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc)
+ .define(TransactionsExpiredTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsExpiredTransactionCleanupIntervalMS, atLeast(1), LOW, TransactionsExpiredTransactionCleanupIntervalMsDoc)
/** ********* Kafka Metrics Configuration ***********/
.define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc)
@@ -1008,6 +1013,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val transactionTopicReplicationFactor = getShort(KafkaConfig.TransactionsTopicReplicationFactorProp)
val transactionTopicPartitions = getInt(KafkaConfig.TransactionsTopicPartitionsProp)
val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
+ val transactionTransactionsExpiredTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsExpiredTransactionCleanupIntervalMsProp)
/** ********* Metric Configuration **************/
val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index cf773bb..a9f1bca 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -17,6 +17,7 @@
package kafka.coordinator.transaction
import kafka.server.DelayedOperationPurgatory
+import kafka.utils.MockScheduler
import kafka.utils.timer.MockTimer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
@@ -46,12 +47,14 @@ class TransactionCoordinatorTest {
private val txnMarkerPurgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("test", new MockTimer, reaperEnabled = false)
private val partitions = mutable.Set[TopicPartition](new TopicPartition("topic1", 0))
+ private val scheduler = new MockScheduler(time)
val coordinator: TransactionCoordinator = new TransactionCoordinator(brokerId,
pidManager,
transactionManager,
transactionMarkerChannelManager,
txnMarkerPurgatory,
+ scheduler,
time)
var result: InitPidResult = _
@@ -613,6 +616,69 @@ class TransactionCoordinatorTest {
EasyMock.verify(transactionManager)
}
+ @Test
+ def shouldAbortExpiredTransactionsInOngoingState(): Unit = {
+ EasyMock.expect(transactionManager.transactionsToExpire())
+ .andReturn(List(TransactionalIdAndMetadata(transactionalId,
+ new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds()))))
+
+ // should bump the epoch and append to the log
+ val metadata = new TransactionMetadata(pid, (epoch + 1).toShort, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+ EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+ EasyMock.eq(metadata),
+ EasyMock.capture(capturedErrorsCallback)))
+ .andAnswer(new IAnswer[Unit] {
+ override def answer(): Unit = {
+ capturedErrorsCallback.getValue.apply(Errors.NONE)
+ }
+ }).once()
+
+ EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
+ .andReturn(true)
+ EasyMock.expect(transactionManager.getTransactionState(transactionalId))
+ .andReturn(Some(metadata))
+ .once()
+
+ // now should perform the rollback and append the state as PrepareAbort
+ val abortMetadata = metadata.copy()
+ abortMetadata.state = PrepareAbort
+ // need to allow for the time.sleep below
+ abortMetadata.lastUpdateTimestamp = time.milliseconds() + TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+
+ EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+ EasyMock.eq(abortMetadata),
+ EasyMock.capture(capturedErrorsCallback)))
+ .andAnswer(new IAnswer[Unit] {
+ override def answer(): Unit = {}
+ })
+ .once()
+
+ EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+
+ coordinator.startup(false)
+ time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+ scheduler.tick()
+ EasyMock.verify(transactionManager)
+ }
+
+ @Test
+ def shouldNotAbortExpiredTransactionsThatHaveAPendingStateTransition(): Unit = {
+ val metadata = new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+ metadata.prepareTransitionTo(PrepareCommit)
+
+ EasyMock.expect(transactionManager.transactionsToExpire())
+ .andReturn(List(TransactionalIdAndMetadata(transactionalId,
+ metadata)))
+
+ EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+ coordinator.startup(false)
+
+ time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+ scheduler.tick()
+ EasyMock.verify(transactionManager)
+
+ }
+
private def validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: TransactionState) = {
val transactionId = "tid"
EasyMock.expect(transactionManager.isCoordinatorFor(transactionId))
http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 09a89dd..2a14898 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -305,6 +305,38 @@ class TransactionStateManagerTest {
}
@Test
+ def shouldOnlyConsiderTransactionsInTheOngoingStateForExpiry(): Unit = {
+ txnMetadata1.state = Ongoing
+ txnMetadata1.transactionStartTime = time.milliseconds()
+ transactionManager.addTransaction(txnId1, txnMetadata1)
+ transactionManager.addTransaction(txnId2, txnMetadata2)
+
+ val ongoingButNotExpiring = txnMetadata1.copy()
+ ongoingButNotExpiring.txnTimeoutMs = 10000
+ transactionManager.addTransaction("not-expiring", ongoingButNotExpiring)
+
+ val prepareCommit = txnMetadata1.copy()
+ prepareCommit.state = PrepareCommit
+ transactionManager.addTransaction("pc", prepareCommit)
+
+ val prepareAbort = txnMetadata1.copy()
+ prepareAbort.state = PrepareAbort
+ transactionManager.addTransaction("pa", prepareAbort)
+
+ val committed = txnMetadata1.copy()
+ committed.state = CompleteCommit
+ transactionManager.addTransaction("cc", committed)
+
+ val aborted = txnMetadata1.copy()
+ aborted.state = CompleteAbort
+ transactionManager.addTransaction("ca", aborted)
+
+ time.sleep(2000)
+ val expiring = transactionManager.transactionsToExpire()
+ assertEquals(List(TransactionalIdAndMetadata(txnId1, txnMetadata1)), expiring)
+ }
+
+ @Test
def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
verifyWritesTxnMarkersInPrepareState(PrepareCommit)
}