You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/01 23:47:13 UTC
kafka git commit: KAFKA-5059 Follow-up: Remove broken locking and fix
handleAddPartitions
Repository: kafka
Updated Branches:
refs/heads/trunk 4aed28d18 -> 1f2451d4e
KAFKA-5059 Follow-up: Remove broken locking and fix handleAddPartitions
remove broken locking. fix handleAddPartitions after complete commit/abort
respond with CONCURRENT_TRANSACTIONS in initPid
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2934 from dguy/follow-up-tc-work
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f2451d4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f2451d4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f2451d4
Branch: refs/heads/trunk
Commit: 1f2451d4e7e3766540d3650d177e304fcddf49b8
Parents: 4aed28d
Author: Damian Guy <da...@gmail.com>
Authored: Mon May 1 16:47:08 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon May 1 16:47:08 2017 -0700
----------------------------------------------------------------------
.../transaction/DelayedTxnMarker.scala | 4 +-
.../transaction/TransactionCoordinator.scala | 79 ++++++-----------
.../transaction/TransactionMetadata.scala | 3 +-
.../TransactionCoordinatorTest.scala | 91 +++++++-------------
4 files changed, 64 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index 9300e6a..313087c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -16,6 +16,8 @@
*/
package kafka.coordinator.transaction
+import java.util.concurrent.TimeUnit
+
import kafka.server.DelayedOperation
import org.apache.kafka.common.protocol.Errors
@@ -24,7 +26,7 @@ import org.apache.kafka.common.protocol.Errors
*/
private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
completionCallback: Errors => Unit)
- extends DelayedOperation(Long.MaxValue) {
+ extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
// overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to
// call purgatory operations while holding the group lock.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 343f7ea..2111a8f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -91,8 +91,6 @@ class TransactionCoordinator(brokerId: Int,
/* Active flag of the coordinator */
private val isActive = new AtomicBoolean(false)
- private val coordinatorLock = new ReentrantReadWriteLock
-
def handleInitPid(transactionalId: String,
transactionTimeoutMs: Int,
responseCallback: InitPidCallback): Unit = {
@@ -148,7 +146,7 @@ class TransactionCoordinator(brokerId: Int,
else
initPidCallback(initTransactionError(errors))
}
- appendToLogInReadLock(transactionalId, metadata, callback)
+ txnManager.appendTransactionToLog(transactionalId, metadata, callback)
}
@@ -168,18 +166,11 @@ class TransactionCoordinator(brokerId: Int,
if (errors != Errors.NONE) {
responseCallback(initTransactionError(errors))
} else {
- // init pid again
- handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
+ responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
}
})
} else if (metadata.state == PrepareAbort || metadata.state == PrepareCommit) {
- // wait for the commit to complete and then init pid again
- txnMarkerPurgatory.tryCompleteElseWatch(new DelayedTxnMarker(metadata, (errors: Errors) => {
- if (errors != Errors.NONE)
- responseCallback(initTransactionError(errors))
- else
- handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
- }), Seq(metadata.pid))
+ responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
} else {
metadata.producerEpoch = (metadata.producerEpoch + 1).toShort
metadata.txnTimeoutMs = transactionTimeoutMs
@@ -227,28 +218,34 @@ class TransactionCoordinator(brokerId: Int,
} else if (metadata.pendingState.isDefined) {
// return a retriable exception to let the client backoff and retry
(Errors.CONCURRENT_TRANSACTIONS, null)
- } else if (metadata.state != Empty && metadata.state != Ongoing) {
- (Errors.INVALID_TXN_STATE, null)
- } else if (partitions.subsetOf(metadata.topicPartitions)) {
- // this is an optimization: if the partitions are already in the metadata reply OK immediately
- (Errors.NONE, null)
+ } else if (metadata.state == PrepareCommit || metadata.state == PrepareAbort) {
+ (Errors.CONCURRENT_TRANSACTIONS, null)
} else {
- val now = time.milliseconds()
- val newMetadata = new TransactionMetadata(pid,
- epoch,
- metadata.txnTimeoutMs,
- Ongoing,
- metadata.topicPartitions ++ partitions,
- if (metadata.state == Empty) now else metadata.transactionStartTime,
- now)
- metadata.prepareTransitionTo(Ongoing)
- (Errors.NONE, newMetadata)
+ if (metadata.state == CompleteAbort || metadata.state == CompleteCommit)
+ metadata.topicPartitions.clear()
+ if (partitions.subsetOf(metadata.topicPartitions)) {
+ // this is an optimization: if the partitions are already in the metadata reply OK immediately
+ (Errors.NONE, null)
+ } else {
+ val now = time.milliseconds()
+ val newMetadata = new TransactionMetadata(pid,
+ epoch,
+ metadata.txnTimeoutMs,
+ Ongoing,
+ metadata.topicPartitions ++ partitions,
+ if (metadata.state == Empty || metadata.state == CompleteCommit || metadata.state == CompleteAbort)
+ now
+ else metadata.transactionStartTime,
+ now)
+ metadata.prepareTransitionTo(Ongoing)
+ (Errors.NONE, newMetadata)
+ }
}
}
}
if (newMetadata != null) {
- appendToLogInReadLock(transactionalId, newMetadata, responseCallback)
+ txnManager.appendTransactionToLog(transactionalId, newMetadata, responseCallback)
} else {
responseCallback(error)
}
@@ -256,16 +253,12 @@ class TransactionCoordinator(brokerId: Int,
}
def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
- inWriteLock(coordinatorLock) {
txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
- }
}
def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
- inWriteLock(coordinatorLock) {
txnManager.removeTransactionsForPartition(transactionStateTopicPartitionId)
txnMarkerChannelManager.removeStateForPartition(transactionStateTopicPartitionId)
- }
}
def handleEndTransaction(transactionalId: String,
@@ -306,23 +299,6 @@ class TransactionCoordinator(brokerId: Int,
}
}
- private def appendToLogInReadLock(transactionalId: String,
- metadata: TransactionMetadata,
- callback: Errors =>Unit): Unit = {
- def unlockCallback(errors:Errors): Unit = {
- coordinatorLock.readLock().unlock()
- callback(errors)
- }
- coordinatorLock.readLock().lock()
- try {
- txnManager.appendTransactionToLog(transactionalId,
- metadata,
- unlockCallback)
- } catch {
- case _:Throwable => coordinatorLock.readLock().unlock()
- }
-
- }
private def commitOrAbort(transactionalId: String,
pid: Long,
epoch: Short,
@@ -364,6 +340,7 @@ class TransactionCoordinator(brokerId: Int,
def writeCommittedTransactionCallback(error: Errors): Unit =
error match {
case Errors.NONE =>
+ trace(s"completed txn for transactionalId: $transactionalId state after commit: ${txnManager.getTransactionState(transactionalId)}")
txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
case Errors.NOT_COORDINATOR =>
// this one should be completed by the new coordinator
@@ -371,10 +348,10 @@ class TransactionCoordinator(brokerId: Int,
case _ =>
warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
// retry until success
- appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+ txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
}
- appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+ txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
case None =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: $transactionalId")
http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 15c428d..d84e054 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -152,8 +152,7 @@ private[coordinator] class TransactionMetadata(val pid: Long,
def copy(): TransactionMetadata =
new TransactionMetadata(pid, producerEpoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition] ++ topicPartitions, transactionStartTime, lastUpdateTimestamp)
- override def toString: String =
- s"(pid: $pid, epoch: $producerEpoch, transactionTimeoutMs: $txnTimeoutMs, transactionState: $state, topicPartitions: ${topicPartitions.mkString("(",",",")")})"
+ override def toString = s"TransactionMetadata($pendingState, $pid, $producerEpoch, $txnTimeoutMs, $state, $topicPartitions, $transactionStartTime, $lastUpdateTimestamp)"
override def equals(that: Any): Boolean = that match {
case other: TransactionMetadata =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2451d4/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 4d8d1c9..cf773bb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -196,26 +196,16 @@ class TransactionCoordinatorTest {
}
@Test
- def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsPrepareCommit(): Unit = {
- validateInvalidTxnState(PrepareCommit)
+ def shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit(): Unit = {
+ validateConcurrentTransactions(PrepareCommit)
}
@Test
- def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsPrepareAbort(): Unit = {
- validateInvalidTxnState(PrepareAbort)
+ def shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort(): Unit = {
+ validateConcurrentTransactions(PrepareAbort)
}
- @Test
- def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsCompleteCommit(): Unit = {
- validateInvalidTxnState(CompleteCommit)
- }
-
- @Test
- def shouldRespondWithInvalidTnxStateOnAddPartitionsWhenStateIsCompleteAbort(): Unit = {
- validateInvalidTxnState(CompleteAbort)
- }
-
- def validateInvalidTxnState(state: TransactionState): Unit = {
+ def validateConcurrentTransactions(state: TransactionState): Unit = {
EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
.andReturn(true)
EasyMock.expect(transactionManager.getTransactionState(transactionalId))
@@ -224,7 +214,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback)
- assertEquals(Errors.INVALID_TXN_STATE, error)
+ assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
}
@Test
@@ -242,14 +232,33 @@ class TransactionCoordinatorTest {
@Test
def shouldAppendNewMetadataToLogOnAddPartitionsWhenPartitionsAdded(): Unit = {
+ validateSuccessfulAddPartitions(Empty)
+ }
+
+ @Test
+ def shouldRespondWithSuccessOnAddPartitionsWhenStateIsOngoing(): Unit = {
+ validateSuccessfulAddPartitions(Ongoing)
+ }
+
+ @Test
+ def shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteCommit(): Unit = {
+ validateSuccessfulAddPartitions(CompleteCommit)
+ }
+
+ @Test
+ def shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort(): Unit = {
+ validateSuccessfulAddPartitions(CompleteAbort)
+ }
+
+ def validateSuccessfulAddPartitions(previousState: TransactionState): Unit = {
EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
.andReturn(true)
EasyMock.expect(transactionManager.getTransactionState(transactionalId))
- .andReturn(Some(new TransactionMetadata(0, 0, 0, Empty, mutable.Set.empty, 0, 0)))
+ .andReturn(Some(new TransactionMetadata(0, 0, 0, previousState, mutable.Set.empty, 0, 0)))
EasyMock.expect(transactionManager.appendTransactionToLog(
EasyMock.eq(transactionalId),
- EasyMock.eq(new TransactionMetadata(0, 0, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())),
+ EasyMock.eq(new TransactionMetadata(0, 0, 0, Ongoing, partitions, if (previousState == Ongoing) 0 else time.milliseconds(), time.milliseconds())),
EasyMock.capture(capturedErrorsCallback)
))
@@ -484,12 +493,12 @@ class TransactionCoordinatorTest {
@Test
def shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState(): Unit ={
- validateWaitsForCompletionBeforeRespondingWithIncrementedEpoch(PrepareCommit)
+ validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(PrepareCommit)
}
@Test
def shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState(): Unit ={
- validateWaitsForCompletionBeforeRespondingWithIncrementedEpoch(PrepareAbort)
+ validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(PrepareAbort)
}
@Test
@@ -506,31 +515,12 @@ class TransactionCoordinatorTest {
mockComplete(PrepareAbort)
- EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
- .andReturn(true).anyTimes()
- EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt()))
- .andReturn(true).anyTimes()
-
- val completedMetadata = new TransactionMetadata(pid, epoch, txnTimeoutMs, CompleteAbort, mutable.Set.empty[TopicPartition], 0, 0)
- EasyMock.expect(transactionManager.getTransactionState(transactionalId))
- .andReturn(Some(completedMetadata))
- .anyTimes()
-
- EasyMock.expect(transactionManager.appendTransactionToLog(
- EasyMock.eq(transactionalId),
- EasyMock.anyObject(classOf[TransactionMetadata]),
- EasyMock.capture(capturedErrorsCallback)
- )).andAnswer(new IAnswer[Unit] {
- override def answer(): Unit = {
- capturedErrorsCallback.getValue.apply(Errors.NONE)
- }
- })
EasyMock.replay(transactionManager, transactionMarkerChannelManager)
coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
- assertEquals(InitPidResult(10, 2, Errors.NONE), result)
+ assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
EasyMock.verify(transactionManager)
}
@@ -623,7 +613,7 @@ class TransactionCoordinatorTest {
EasyMock.verify(transactionManager)
}
- private def validateWaitsForCompletionBeforeRespondingWithIncrementedEpoch(state: TransactionState) = {
+ private def validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: TransactionState) = {
val transactionId = "tid"
EasyMock.expect(transactionManager.isCoordinatorFor(transactionId))
.andReturn(true).anyTimes()
@@ -634,28 +624,11 @@ class TransactionCoordinatorTest {
EasyMock.expect(transactionManager.getTransactionState(transactionId))
.andReturn(Some(metadata)).anyTimes()
- EasyMock.expect(transactionManager.appendTransactionToLog(
- EasyMock.eq(transactionId),
- EasyMock.anyObject(classOf[TransactionMetadata]),
- EasyMock.capture(capturedErrorsCallback)
- )).andAnswer(new IAnswer[Unit] {
- override def answer(): Unit = {
- capturedErrorsCallback.getValue.apply(Errors.NONE)
- }
- })
-
EasyMock.replay(transactionManager)
coordinator.handleInitPid(transactionId, 10, initPidMockCallback)
- // no result yet as hasn't completed
- assertNull(result)
- // complete the transaction
- metadata.topicPartitions.clear()
- metadata.state = if (state == PrepareCommit) CompleteCommit else CompleteAbort
- txnMarkerPurgatory.checkAndComplete(0L)
- assertEquals(InitPidResult(0, 1, Errors.NONE), result)
- assertEquals(new TransactionMetadata(0, 1, 10, Empty, mutable.Set.empty, 0, time.milliseconds()), metadata)
+ assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
}
private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) = {