You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/18 18:17:43 UTC

kafka git commit: KAFKA-5268; Fix bounce test transient failure by clearing partitions before writing Complete state to transaction log

Repository: kafka
Updated Branches:
  refs/heads/trunk 45f226176 -> d662b09c9


KAFKA-5268; Fix bounce test transient failure by clearing partitions before writing Complete state to transaction log

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #3089 from hachikuji/KAFKA-5268


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d662b09c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d662b09c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d662b09c

Branch: refs/heads/trunk
Commit: d662b09c9f32d7d4dcfc18522a4d2789b43d319c
Parents: 45f2261
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu May 18 11:17:30 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu May 18 11:17:30 2017 -0700

----------------------------------------------------------------------
 .../coordinator/transaction/TransactionCoordinator.scala     | 2 +-
 .../kafka/coordinator/transaction/TransactionMetadata.scala  | 2 +-
 .../coordinator/transaction/TransactionCoordinatorTest.scala | 8 +++++++-
 3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index ebfbde5..8de6dbd 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -243,7 +243,7 @@ class TransactionCoordinator(brokerId: Int,
               Left(Errors.CONCURRENT_TRANSACTIONS)
             } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
               Left(Errors.CONCURRENT_TRANSACTIONS)
-            } else if (partitions.subsetOf(txnMetadata.topicPartitions)) {
+            } else if (txnMetadata.state == Ongoing && partitions.subsetOf(txnMetadata.topicPartitions)) {
               // this is an optimization: if the partitions are already in the metadata reply OK immediately
               Left(Errors.NONE)
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d739b9a..6e29308 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -185,7 +185,7 @@ private[transaction] class TransactionMetadata(val producerId: Long,
 
   def prepareComplete(updateTimestamp: Long): TransactionMetadataTransition = {
     val newState = if (state == PrepareCommit) CompleteCommit else CompleteAbort
-    prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, topicPartitions.toSet, txnStartTimestamp, updateTimestamp)
+    prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, Set.empty[TopicPartition], txnStartTimestamp, updateTimestamp)
   }
 
   // visible for testing only

http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 7271edd..04f76bd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -672,7 +672,13 @@ class TransactionCoordinatorTest {
       .andReturn(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareMetadata)))
       .once()
 
-    val newMetadata = prepareMetadata.copy().prepareComplete(now)
+    val newMetadata = TransactionMetadataTransition(producerId = pid,
+      producerEpoch = epoch,
+      txnTimeoutMs = txnTimeoutMs,
+      txnState = finalState,
+      topicPartitions = Set.empty[TopicPartition],
+      txnStartTimestamp = prepareMetadata.txnStartTimestamp,
+      txnLastUpdateTimestamp = now)
     EasyMock.expect(transactionMarkerChannelManager.addTxnMarkersToSend(
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),