You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/12 17:36:04 UTC

kafka git commit: KAFKA-5132: abort long running transactions

Repository: kafka
Updated Branches:
  refs/heads/trunk 766dea94e -> 495184916


KAFKA-5132: abort long running transactions

Abort any ongoing transactions that haven't been touched for longer than the transaction timeout

Author: Damian Guy <da...@gmail.com>

Reviewers: Jason Gustafson, Apurva Mehta, Ismael Juma, Guozhang Wang

Closes #2957 from dguy/kafka-5132


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49518491
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49518491
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49518491

Branch: refs/heads/trunk
Commit: 4951849163b1defea91129472b5354531407deb9
Parents: 766dea9
Author: Damian Guy <da...@gmail.com>
Authored: Fri May 12 10:36:02 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 10:36:02 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 43 ++++++++++++-
 .../transaction/TransactionMarkerChannel.scala  |  3 +-
 .../TransactionMarkerChannelManager.scala       |  2 +-
 .../transaction/TransactionMetadata.scala       |  4 +-
 .../transaction/TransactionStateManager.scala   | 25 ++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 ++
 .../TransactionCoordinatorTest.scala            | 66 ++++++++++++++++++++
 .../TransactionStateManagerTest.scala           | 32 ++++++++++
 8 files changed, 170 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 38e725f..982e009 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -44,14 +44,15 @@ object TransactionCoordinator {
       config.transactionTopicReplicationFactor,
       config.transactionTopicSegmentBytes,
       config.transactionsLoadBufferSize,
-      config.transactionTopicMinISR)
+      config.transactionTopicMinISR,
+      config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
 
     val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
     val logManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
     val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId)
     val transactionMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, time)
+    new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, scheduler, time)
   }
 
   private def initTransactionError(error: Errors): InitPidResult = {
@@ -76,6 +77,7 @@ class TransactionCoordinator(brokerId: Int,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: TransactionMarkerChannelManager,
                              txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
+                             scheduler: Scheduler,
                              time: Time) extends Logging {
   this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
 
@@ -383,11 +385,45 @@ class TransactionCoordinator(brokerId: Int,
 
   def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
 
+  private def expireTransactions(): Unit = {
+
+    txnManager.transactionsToExpire().foreach{ idAndMetadata =>
+      idAndMetadata.metadata synchronized {
+        if (!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId)
+          && idAndMetadata.metadata.pendingState.isEmpty) {
+          // bump the producerEpoch so that any further requests for this transactionalId will be fenced
+          idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort
+          idAndMetadata.metadata.prepareTransitionTo(Ongoing)
+          txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => {
+            if (errors != Errors.NONE)
+              warn(s"failed to append transactionalId ${idAndMetadata.transactionalId} to log during transaction expiry. errors:$errors")
+            else
+              handleEndTransaction(idAndMetadata.transactionalId,
+                idAndMetadata.metadata.pid,
+                idAndMetadata.metadata.producerEpoch,
+                TransactionResult.ABORT,
+                (errors: Errors) => {
+                  if (errors != Errors.NONE)
+                    warn(s"rollback of transactionalId: ${idAndMetadata.transactionalId} failed during transaction expiry. errors: $errors")
+                }
+              )
+          })
+        }
+      }
+    }
+  }
+
   /**
    * Startup logic executed at the same time when the server starts up.
    */
   def startup(enablePidExpiration: Boolean = true) {
     info("Starting up.")
+    scheduler.startup()
+    scheduler.schedule("transaction-expiration",
+      expireTransactions,
+      TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs,
+      TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+    )
     if (enablePidExpiration)
       txnManager.enablePidExpiration()
     txnMarkerChannelManager.start()
@@ -403,10 +439,11 @@ class TransactionCoordinator(brokerId: Int,
   def shutdown() {
     info("Shutting down.")
     isActive.set(false)
+    scheduler.shutdown()
+    txnMarkerPurgatory.shutdown()
     pidManager.shutdown()
     txnManager.shutdown()
     txnMarkerChannelManager.shutdown()
-    txnMarkerPurgatory.shutdown()
     info("Shutdown complete.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
index cad3ea5..e60bd40 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
@@ -166,9 +166,10 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
     pendingTxnMap.get(PendingTxnKey(metadataPartition, pid))
   }
 
-  def clear(): Unit = {
+  def close(): Unit = {
     brokerStateMap.clear()
     pendingTxnMap.clear()
+    networkClient.close()
   }
 
   def removeStateForPartition(partition: Int): mutable.Iterable[Long] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 2c17564..1b7ea56 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -107,7 +107,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   def shutdown(): Unit = {
     interBrokerSendThread.shutdown()
-    transactionMarkerChannel.clear()
+    transactionMarkerChannel.close()
   }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d84e054..a81e47b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -161,8 +161,8 @@ private[coordinator] class TransactionMetadata(val pid: Long,
       txnTimeoutMs == other.txnTimeoutMs &&
       state.equals(other.state) &&
       topicPartitions.equals(other.topicPartitions) &&
-      transactionStartTime.equals(other.transactionStartTime) &&
-      lastUpdateTimestamp.equals(other.lastUpdateTimestamp)
+      transactionStartTime == other.transactionStartTime &&
+      lastUpdateTimestamp == other.lastUpdateTimestamp
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e23324f..f5dc3c0 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -43,6 +43,7 @@ object TransactionManager {
   // default transaction management config values
   val DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7).toInt
   val DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15).toInt
+  val DefaultRemoveExpiredTransactionsIntervalMs = TimeUnit.MINUTES.toMillis(1).toInt
 }
 
 /**
@@ -82,9 +83,9 @@ class TransactionStateManager(brokerId: Int,
   private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
 
   def enablePidExpiration() {
-    scheduler.startup()
-
-    // TODO: add transaction and pid expiration logic
+    if (!scheduler.isStarted)
+      scheduler.startup()
+    // TODO: add pid expiration logic
   }
 
   /**
@@ -142,6 +143,19 @@ class TransactionStateManager(brokerId: Int,
     loadingPartitions.contains(partitionId)
   }
 
+
+  def transactionsToExpire(): Iterable[TransactionalIdAndMetadata] = {
+    val now = time.milliseconds()
+    transactionMetadataCache.filter { case (_, metadata) =>
+      metadata.state match {
+        case Ongoing =>
+          metadata.transactionStartTime + metadata.txnTimeoutMs < now
+        case _ => false
+      }
+    }.map {case (id, metadata) =>
+      TransactionalIdAndMetadata(id, metadata)
+    }
+  }
   /**
    * Gets the partition count of the transaction log topic from ZooKeeper.
    * If the topic does not exist, the default partition count is returned.
@@ -445,4 +459,7 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I
                                                   transactionLogReplicationFactor: Short = TransactionLog.DefaultReplicationFactor,
                                                   transactionLogSegmentBytes: Int = TransactionLog.DefaultSegmentBytes,
                                                   transactionLogLoadBufferSize: Int = TransactionLog.DefaultLoadBufferSize,
-                                                  transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas)
+                                                  transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas,
+                                                  removeExpiredTransactionsIntervalMs: Int = TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+
+case class TransactionalIdAndMetadata(transactionalId: String, metadata: TransactionMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 708201a..76f6380 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -165,6 +165,7 @@ object Defaults {
   val TransactionsTopicReplicationFactor = TransactionLog.DefaultReplicationFactor
   val TransactionsTopicPartitions = TransactionLog.DefaultNumPartitions
   val TransactionsTopicSegmentBytes = TransactionLog.DefaultSegmentBytes
+  val TransactionsExpiredTransactionCleanupIntervalMS = TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
@@ -349,6 +350,8 @@ object KafkaConfig {
   val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
   val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
   val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor"
+  val TransactionsExpiredTransactionCleanupIntervalMsProp = "transaction.expired.transaction.cleanup.interval.ms"
+
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
@@ -594,6 +597,7 @@ object KafkaConfig {
     "Internal topic creation will fail until the cluster size meets this replication factor requirement."
   val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
   val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
+  val TransactionsExpiredTransactionCleanupIntervalMsDoc = "The interval at which to rollback expired transactions"
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " +
@@ -799,6 +803,7 @@ object KafkaConfig {
       .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TransactionsTopicReplicationFactor, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc)
       .define(TransactionsTopicPartitionsProp, INT, Defaults.TransactionsTopicPartitions, atLeast(1), HIGH, TransactionsTopicPartitionsDoc)
       .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TransactionsTopicSegmentBytes, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc)
+      .define(TransactionsExpiredTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsExpiredTransactionCleanupIntervalMS, atLeast(1), LOW, TransactionsExpiredTransactionCleanupIntervalMsDoc)
 
       /** ********* Kafka Metrics Configuration ***********/
       .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc)
@@ -1008,6 +1013,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val transactionTopicReplicationFactor = getShort(KafkaConfig.TransactionsTopicReplicationFactorProp)
   val transactionTopicPartitions = getInt(KafkaConfig.TransactionsTopicPartitionsProp)
   val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
+  val transactionTransactionsExpiredTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsExpiredTransactionCleanupIntervalMsProp)
 
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index cf773bb..a9f1bca 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.server.DelayedOperationPurgatory
+import kafka.utils.MockScheduler
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
@@ -46,12 +47,14 @@ class TransactionCoordinatorTest {
 
   private val txnMarkerPurgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("test", new MockTimer, reaperEnabled = false)
   private val partitions = mutable.Set[TopicPartition](new TopicPartition("topic1", 0))
+  private val scheduler = new MockScheduler(time)
 
   val coordinator: TransactionCoordinator = new TransactionCoordinator(brokerId,
     pidManager,
     transactionManager,
     transactionMarkerChannelManager,
     txnMarkerPurgatory,
+    scheduler,
     time)
 
   var result: InitPidResult = _
@@ -613,6 +616,69 @@ class TransactionCoordinatorTest {
     EasyMock.verify(transactionManager)
   }
 
+  @Test
+  def shouldAbortExpiredTransactionsInOngoingState(): Unit = {
+    EasyMock.expect(transactionManager.transactionsToExpire())
+    .andReturn(List(TransactionalIdAndMetadata(transactionalId,
+      new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds()))))
+
+    // should bump the epoch and append to the log
+    val metadata = new TransactionMetadata(pid, (epoch + 1).toShort, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+    EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+      EasyMock.eq(metadata),
+      EasyMock.capture(capturedErrorsCallback)))
+    .andAnswer(new IAnswer[Unit] {
+      override def answer(): Unit = {
+        capturedErrorsCallback.getValue.apply(Errors.NONE)
+      }
+    }).once()
+
+    EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
+      .andReturn(true)
+    EasyMock.expect(transactionManager.getTransactionState(transactionalId))
+      .andReturn(Some(metadata))
+      .once()
+
+    // now should perform the rollback and append the state as PrepareAbort
+    val abortMetadata = metadata.copy()
+    abortMetadata.state = PrepareAbort
+    // need to allow for the time.sleep below
+    abortMetadata.lastUpdateTimestamp = time.milliseconds() + TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+
+    EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+      EasyMock.eq(abortMetadata),
+      EasyMock.capture(capturedErrorsCallback)))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {}
+      })
+    .once()
+
+    EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+
+    coordinator.startup(false)
+    time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+    scheduler.tick()
+    EasyMock.verify(transactionManager)
+  }
+
+  @Test
+  def shouldNotAbortExpiredTransactionsThatHaveAPendingStateTransition(): Unit = {
+    val metadata = new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())
+    metadata.prepareTransitionTo(PrepareCommit)
+
+    EasyMock.expect(transactionManager.transactionsToExpire())
+      .andReturn(List(TransactionalIdAndMetadata(transactionalId,
+        metadata)))
+    
+    EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+    coordinator.startup(false)
+
+    time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+    scheduler.tick()
+    EasyMock.verify(transactionManager)
+
+  }
+
   private def validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: TransactionState) = {
     val transactionId = "tid"
     EasyMock.expect(transactionManager.isCoordinatorFor(transactionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 09a89dd..2a14898 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -305,6 +305,38 @@ class TransactionStateManagerTest {
   }
 
   @Test
+  def shouldOnlyConsiderTransactionsInTheOngoingStateForExpiry(): Unit = {
+    txnMetadata1.state = Ongoing
+    txnMetadata1.transactionStartTime = time.milliseconds()
+    transactionManager.addTransaction(txnId1, txnMetadata1)
+    transactionManager.addTransaction(txnId2, txnMetadata2)
+
+    val ongoingButNotExpiring = txnMetadata1.copy()
+    ongoingButNotExpiring.txnTimeoutMs = 10000
+    transactionManager.addTransaction("not-expiring", ongoingButNotExpiring)
+
+    val prepareCommit = txnMetadata1.copy()
+    prepareCommit.state = PrepareCommit
+    transactionManager.addTransaction("pc", prepareCommit)
+
+    val prepareAbort = txnMetadata1.copy()
+    prepareAbort.state = PrepareAbort
+    transactionManager.addTransaction("pa", prepareAbort)
+
+    val committed = txnMetadata1.copy()
+    committed.state = CompleteCommit
+    transactionManager.addTransaction("cc", committed)
+
+    val aborted = txnMetadata1.copy()
+    aborted.state = CompleteAbort
+    transactionManager.addTransaction("ca", aborted)
+
+    time.sleep(2000)
+    val expiring = transactionManager.transactionsToExpire()
+    assertEquals(List(TransactionalIdAndMetadata(txnId1, txnMetadata1)), expiring)
+  }
+
+  @Test
   def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
     verifyWritesTxnMarkersInPrepareState(PrepareCommit)
   }