You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/18 18:17:43 UTC
kafka git commit: KAFKA-5268;
Fix bounce test transient failure by clearing partitions before
writing Complete state to transaction log
Repository: kafka
Updated Branches:
refs/heads/trunk 45f226176 -> d662b09c9
KAFKA-5268; Fix bounce test transient failure by clearing partitions before writing Complete state to transaction log
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3089 from hachikuji/KAFKA-5268
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d662b09c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d662b09c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d662b09c
Branch: refs/heads/trunk
Commit: d662b09c9f32d7d4dcfc18522a4d2789b43d319c
Parents: 45f2261
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu May 18 11:17:30 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu May 18 11:17:30 2017 -0700
----------------------------------------------------------------------
.../coordinator/transaction/TransactionCoordinator.scala | 2 +-
.../kafka/coordinator/transaction/TransactionMetadata.scala | 2 +-
.../coordinator/transaction/TransactionCoordinatorTest.scala | 8 +++++++-
3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index ebfbde5..8de6dbd 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -243,7 +243,7 @@ class TransactionCoordinator(brokerId: Int,
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
Left(Errors.CONCURRENT_TRANSACTIONS)
- } else if (partitions.subsetOf(txnMetadata.topicPartitions)) {
+ } else if (txnMetadata.state == Ongoing && partitions.subsetOf(txnMetadata.topicPartitions)) {
// this is an optimization: if the partitions are already in the metadata reply OK immediately
Left(Errors.NONE)
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d739b9a..6e29308 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -185,7 +185,7 @@ private[transaction] class TransactionMetadata(val producerId: Long,
def prepareComplete(updateTimestamp: Long): TransactionMetadataTransition = {
val newState = if (state == PrepareCommit) CompleteCommit else CompleteAbort
- prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, topicPartitions.toSet, txnStartTimestamp, updateTimestamp)
+ prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, Set.empty[TopicPartition], txnStartTimestamp, updateTimestamp)
}
// visible for testing only
http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 7271edd..04f76bd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -672,7 +672,13 @@ class TransactionCoordinatorTest {
.andReturn(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareMetadata)))
.once()
- val newMetadata = prepareMetadata.copy().prepareComplete(now)
+ val newMetadata = TransactionMetadataTransition(producerId = pid,
+ producerEpoch = epoch,
+ txnTimeoutMs = txnTimeoutMs,
+ txnState = finalState,
+ topicPartitions = Set.empty[TopicPartition],
+ txnStartTimestamp = prepareMetadata.txnStartTimestamp,
+ txnLastUpdateTimestamp = now)
EasyMock.expect(transactionMarkerChannelManager.addTxnMarkersToSend(
EasyMock.eq(transactionalId),
EasyMock.eq(coordinatorEpoch),