You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/01 23:47:13 UTC

kafka git commit: KAFKA-5059 Follow-up: Remove broken locking and fix handleAddPartitions

Repository: kafka
Updated Branches:
  refs/heads/trunk 4aed28d18 -> 1f2451d4e


KAFKA-5059 Follow-up: Remove broken locking and fix handleAddPartitions

remove broken locking. fix handleAddPartitions after complete commit/abort
respond with CONCURRENT_TRANSACTIONS in initPid

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2934 from dguy/follow-up-tc-work


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f2451d4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f2451d4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f2451d4

Branch: refs/heads/trunk
Commit: 1f2451d4e7e3766540d3650d177e304fcddf49b8
Parents: 4aed28d
Author: Damian Guy <da...@gmail.com>
Authored: Mon May 1 16:47:08 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon May 1 16:47:08 2017 -0700

----------------------------------------------------------------------
 .../transaction/DelayedTxnMarker.scala          |  4 +-
 .../transaction/TransactionCoordinator.scala    | 79 ++++++-----------
 .../transaction/TransactionMetadata.scala       |  3 +-
 .../TransactionCoordinatorTest.scala            | 91 +++++++-------------
 4 files changed, 64 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index 9300e6a..313087c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -16,6 +16,8 @@
  */
 package kafka.coordinator.transaction
 
+import java.util.concurrent.TimeUnit
+
 import kafka.server.DelayedOperation
 import org.apache.kafka.common.protocol.Errors
 
@@ -24,7 +26,7 @@ import org.apache.kafka.common.protocol.Errors
   */
 private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
                                            completionCallback: Errors => Unit)
-  extends DelayedOperation(Long.MaxValue) {
+  extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
 
   // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to
   // call purgatory operations while holding the group lock.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 343f7ea..2111a8f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -91,8 +91,6 @@ class TransactionCoordinator(brokerId: Int,
   /* Active flag of the coordinator */
   private val isActive = new AtomicBoolean(false)
 
-  private val coordinatorLock = new ReentrantReadWriteLock
-
   def handleInitPid(transactionalId: String,
                     transactionTimeoutMs: Int,
                     responseCallback: InitPidCallback): Unit = {
@@ -148,7 +146,7 @@ class TransactionCoordinator(brokerId: Int,
       else
         initPidCallback(initTransactionError(errors))
     }
-    appendToLogInReadLock(transactionalId, metadata, callback)
+    txnManager.appendTransactionToLog(transactionalId, metadata, callback)
   }
 
 
@@ -168,18 +166,11 @@ class TransactionCoordinator(brokerId: Int,
             if (errors != Errors.NONE) {
               responseCallback(initTransactionError(errors))
             } else {
-              // init pid again
-              handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
+              responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
             }
           })
       } else if (metadata.state == PrepareAbort || metadata.state == PrepareCommit) {
-        // wait for the commit to complete and then init pid again
-        txnMarkerPurgatory.tryCompleteElseWatch(new DelayedTxnMarker(metadata, (errors: Errors) => {
-          if (errors != Errors.NONE)
-            responseCallback(initTransactionError(errors))
-          else
-            handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
-        }), Seq(metadata.pid))
+        responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
       } else {
         metadata.producerEpoch = (metadata.producerEpoch + 1).toShort
         metadata.txnTimeoutMs = transactionTimeoutMs
@@ -227,28 +218,34 @@ class TransactionCoordinator(brokerId: Int,
             } else if (metadata.pendingState.isDefined) {
               // return a retriable exception to let the client backoff and retry
               (Errors.CONCURRENT_TRANSACTIONS, null)
-            } else if (metadata.state != Empty && metadata.state != Ongoing) {
-              (Errors.INVALID_TXN_STATE, null)
-            } else if (partitions.subsetOf(metadata.topicPartitions)) {
-              // this is an optimization: if the partitions are already in the metadata reply OK immediately
-              (Errors.NONE, null)
+            } else if (metadata.state == PrepareCommit || metadata.state == PrepareAbort) {
+              (Errors.CONCURRENT_TRANSACTIONS, null)
             } else {
-              val now = time.milliseconds()
-              val newMetadata = new TransactionMetadata(pid,
-                epoch,
-                metadata.txnTimeoutMs,
-                Ongoing,
-                metadata.topicPartitions ++ partitions,
-                if (metadata.state == Empty) now else metadata.transactionStartTime,
-                now)
-              metadata.prepareTransitionTo(Ongoing)
-              (Errors.NONE, newMetadata)
+              if (metadata.state == CompleteAbort || metadata.state == CompleteCommit)
+                metadata.topicPartitions.clear()
+              if (partitions.subsetOf(metadata.topicPartitions)) {
+                // this is an optimization: if the partitions are already in the metadata reply OK immediately
+                (Errors.NONE, null)
+              } else {
+                val now = time.milliseconds()
+                val newMetadata = new TransactionMetadata(pid,
+                  epoch,
+                  metadata.txnTimeoutMs,
+                  Ongoing,
+                  metadata.topicPartitions ++ partitions,
+                  if (metadata.state == Empty || metadata.state == CompleteCommit || metadata.state == CompleteAbort)
+                    now
+                  else metadata.transactionStartTime,
+                  now)
+                metadata.prepareTransitionTo(Ongoing)
+                (Errors.NONE, newMetadata)
+              }
             }
           }
       }
 
       if (newMetadata != null) {
-        appendToLogInReadLock(transactionalId, newMetadata, responseCallback)
+        txnManager.appendTransactionToLog(transactionalId, newMetadata, responseCallback)
       } else {
         responseCallback(error)
       }
@@ -256,16 +253,12 @@ class TransactionCoordinator(brokerId: Int,
   }
 
   def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
-    inWriteLock(coordinatorLock) {
       txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
-    }
   }
 
   def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
-    inWriteLock(coordinatorLock) {
       txnManager.removeTransactionsForPartition(transactionStateTopicPartitionId)
       txnMarkerChannelManager.removeStateForPartition(transactionStateTopicPartitionId)
-    }
   }
 
   def handleEndTransaction(transactionalId: String,
@@ -306,23 +299,6 @@ class TransactionCoordinator(brokerId: Int,
       }
   }
 
-  private def appendToLogInReadLock(transactionalId: String,
-                                   metadata: TransactionMetadata,
-                                   callback: Errors =>Unit): Unit = {
-    def unlockCallback(errors:Errors): Unit = {
-      coordinatorLock.readLock().unlock()
-      callback(errors)
-    }
-    coordinatorLock.readLock().lock()
-    try {
-      txnManager.appendTransactionToLog(transactionalId,
-        metadata,
-        unlockCallback)
-    } catch {
-      case _:Throwable => coordinatorLock.readLock().unlock()
-    }
-
-  }
   private def commitOrAbort(transactionalId: String,
                             pid: Long,
                             epoch: Short,
@@ -364,6 +340,7 @@ class TransactionCoordinator(brokerId: Int,
                       def writeCommittedTransactionCallback(error: Errors): Unit =
                         error match {
                           case Errors.NONE =>
+                            trace(s"completed txn for transactionalId: $transactionalId state after commit: ${txnManager.getTransactionState(transactionalId)}")
                             txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
                           case Errors.NOT_COORDINATOR =>
                             // this one should be completed by the new coordinator
@@ -371,10 +348,10 @@ class TransactionCoordinator(brokerId: Int,
                           case _ =>
                             warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
                             // retry until success
-                            appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+                            txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
                         }
 
-                      appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+                      txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
                     case None =>
                       // this one should be completed by the new coordinator
                       warn(s"no longer the coordinator for transactionalId: $transactionalId")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 15c428d..d84e054 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -152,8 +152,7 @@ private[coordinator] class TransactionMetadata(val pid: Long,
   def copy(): TransactionMetadata =
     new TransactionMetadata(pid, producerEpoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition] ++ topicPartitions, transactionStartTime, lastUpdateTimestamp)
 
-  override def toString: String =
-    s"(pid: $pid, epoch: $producerEpoch, transactionTimeoutMs: $txnTimeoutMs, transactionState: $state, topicPartitions: ${topicPartitions.mkString("(",",",")")})"
+  override def toString = s"TransactionMetadata($pendingState, $pid, $producerEpoch, $txnTimeoutMs, $state, $topicPartitions, $transactionStartTime, $lastUpdateTimestamp)"
 
   override def equals(that: Any): Boolean = that match {
     case other: TransactionMetadata =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 4d8d1c9..cf773bb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -196,26 +196,16 @@ class TransactionCoordinatorTest {
   }
 
   @Test
-  def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsPrepareCommit(): Unit = {
-    validateInvalidTxnState(PrepareCommit)
+  def shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit(): Unit = {
+    validateConcurrentTransactions(PrepareCommit)
   }
 
   @Test
-  def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsPrepareAbort(): Unit = {
-    validateInvalidTxnState(PrepareAbort)
+  def shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort(): Unit = {
+    validateConcurrentTransactions(PrepareAbort)
   }
 
-  @Test
-  def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsCompleteCommit(): Unit = {
-    validateInvalidTxnState(CompleteCommit)
-  }
-
-  @Test
-  def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsCompleteAbort(): Unit = {
-    validateInvalidTxnState(CompleteAbort)
-  }
-
-  def validateInvalidTxnState(state: TransactionState): Unit = {
+  def validateConcurrentTransactions(state: TransactionState): Unit = {
     EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
       .andReturn(true)
     EasyMock.expect(transactionManager.getTransactionState(transactionalId))
@@ -224,7 +214,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback)
-    assertEquals(Errors.INVALID_TXN_STATE, error)
+    assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
   }
 
   @Test
@@ -242,14 +232,33 @@ class TransactionCoordinatorTest {
 
   @Test
   def shouldAppendNewMetadataToLogOnAddPartitionsWhenPartitionsAdded(): Unit = {
+    validateSuccessfulAddPartitions(Empty)
+  }
+
+  @Test
+  def shouldRespondWithSuccessOnAddPartitionsWhenStateIsOngoing(): Unit = {
+    validateSuccessfulAddPartitions(Ongoing)
+  }
+
+  @Test
+  def shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteCommit(): Unit = {
+    validateSuccessfulAddPartitions(CompleteCommit)
+  }
+
+  @Test
+  def shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort(): Unit = {
+    validateSuccessfulAddPartitions(CompleteAbort)
+  }
+
+  def validateSuccessfulAddPartitions(previousState: TransactionState): Unit = {
     EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
       .andReturn(true)
     EasyMock.expect(transactionManager.getTransactionState(transactionalId))
-      .andReturn(Some(new TransactionMetadata(0, 0, 0, Empty, mutable.Set.empty, 0, 0)))
+      .andReturn(Some(new TransactionMetadata(0, 0, 0, previousState, mutable.Set.empty, 0, 0)))
 
     EasyMock.expect(transactionManager.appendTransactionToLog(
       EasyMock.eq(transactionalId),
-      EasyMock.eq(new TransactionMetadata(0, 0, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())),
+      EasyMock.eq(new TransactionMetadata(0, 0, 0, Ongoing, partitions, if (previousState == Ongoing) 0 else time.milliseconds(), time.milliseconds())),
       EasyMock.capture(capturedErrorsCallback)
     ))
 
@@ -484,12 +493,12 @@ class TransactionCoordinatorTest {
 
   @Test
   def shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState(): Unit ={
-    validateWaitsForCompletionBeforeRespondingWithIncrementedEpoch(PrepareCommit)
+    validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(PrepareCommit)
   }
 
   @Test
   def shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState(): Unit ={
-    validateWaitsForCompletionBeforeRespondingWithIncrementedEpoch(PrepareAbort)
+    validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(PrepareAbort)
   }
 
   @Test
@@ -506,31 +515,12 @@ class TransactionCoordinatorTest {
 
     mockComplete(PrepareAbort)
 
-    EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
-      .andReturn(true).anyTimes()
-    EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt()))
-      .andReturn(true).anyTimes()
-
-    val completedMetadata = new TransactionMetadata(pid, epoch, txnTimeoutMs, CompleteAbort, mutable.Set.empty[TopicPartition], 0, 0)
-    EasyMock.expect(transactionManager.getTransactionState(transactionalId))
-      .andReturn(Some(completedMetadata))
-      .anyTimes()
-
-    EasyMock.expect(transactionManager.appendTransactionToLog(
-      EasyMock.eq(transactionalId),
-      EasyMock.anyObject(classOf[TransactionMetadata]),
-      EasyMock.capture(capturedErrorsCallback)
-    )).andAnswer(new IAnswer[Unit] {
-      override def answer(): Unit = {
-        capturedErrorsCallback.getValue.apply(Errors.NONE)
-      }
-    })
 
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
     coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
 
-    assertEquals(InitPidResult(10, 2, Errors.NONE), result)
+    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
     EasyMock.verify(transactionManager)
   }
 
@@ -623,7 +613,7 @@ class TransactionCoordinatorTest {
     EasyMock.verify(transactionManager)
   }
 
-  private def validateWaitsForCompletionBeforeRespondingWithIncrementedEpoch(state: TransactionState) = {
+  private def validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: TransactionState) = {
     val transactionId = "tid"
     EasyMock.expect(transactionManager.isCoordinatorFor(transactionId))
       .andReturn(true).anyTimes()
@@ -634,28 +624,11 @@ class TransactionCoordinatorTest {
     EasyMock.expect(transactionManager.getTransactionState(transactionId))
       .andReturn(Some(metadata)).anyTimes()
 
-    EasyMock.expect(transactionManager.appendTransactionToLog(
-      EasyMock.eq(transactionId),
-      EasyMock.anyObject(classOf[TransactionMetadata]),
-      EasyMock.capture(capturedErrorsCallback)
-    )).andAnswer(new IAnswer[Unit] {
-      override def answer(): Unit = {
-        capturedErrorsCallback.getValue.apply(Errors.NONE)
-      }
-    })
-
     EasyMock.replay(transactionManager)
 
     coordinator.handleInitPid(transactionId, 10, initPidMockCallback)
-    // no result yet as hasn't completed
-    assertNull(result)
-    // complete the transaction
-    metadata.topicPartitions.clear()
-    metadata.state = if (state == PrepareCommit) CompleteCommit else CompleteAbort
-    txnMarkerPurgatory.checkAndComplete(0L)
 
-    assertEquals(InitPidResult(0, 1, Errors.NONE), result)
-    assertEquals(new TransactionMetadata(0, 1, 10, Empty, mutable.Set.empty, 0, time.milliseconds()), metadata)
+    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
   }
 
   private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) = {