You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/17 21:41:10 UTC

[kafka] branch 2.5 updated: KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 9d68b8e  KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
9d68b8e is described below

commit 9d68b8e3db2df135c799fc9523c99570d1ed6a26
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 17 14:40:02 2020 -0700

    KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
    
    In prepareAddPartitions the txnStartTimestamp could be updated as updateTimestamp, which is assumed to be always larger then the original startTimestamp. However, due to ntp time shift the timer may go backwards and hence the newStartTimestamp be smaller than the original one. Then later in completeTransitionTo the time check would fail with an IllegalStateException, and the txn would not transit to Ongoing.
    
    An indirect result of this, is that this txn would NEVER be expired anymore because only Ongoing ones would be checked for expiration.
    
    We should do the same as in #3286 to remove this check.
    
    Also added test coverage for both KAFKA-5415 and KAFKA-8803.
    
    Reviewers: Jason Gustafson<ja...@confluent.io>
---
 .../transaction/TransactionMetadata.scala          |   3 +-
 .../transaction/TransactionMetadataTest.scala      | 215 ++++++++++++++++++---
 2 files changed, 194 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 4b57abf..24b418a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -378,8 +378,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
         case Ongoing => // from addPartitions
           if (!validProducerEpoch(transitMetadata) ||
             !topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
-            txnTimeoutMs != transitMetadata.txnTimeoutMs ||
-            txnStartTimestamp > transitMetadata.txnStartTimestamp) {
+            txnTimeoutMs != transitMetadata.txnTimeoutMs) {
 
             throwStateTransitionFailure(transitMetadata)
           } else {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index 506e68d..85ee263 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.utils.MockTime
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.junit.Assert._
@@ -27,11 +28,11 @@ import scala.collection.mutable
 class TransactionMetadataTest {
 
   val time = new MockTime()
+  val producerId = 23423L
+  val transactionalId = "txnlId"
 
   @Test
   def testInitializeEpoch(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = RecordBatch.NO_PRODUCER_EPOCH
 
     val txnMetadata = new TransactionMetadata(
@@ -55,8 +56,6 @@ class TransactionMetadataTest {
 
   @Test
   def testNormalEpochBump(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -79,8 +78,6 @@ class TransactionMetadataTest {
 
   @Test(expected = classOf[IllegalStateException])
   def testBumpEpochNotAllowedIfEpochsExhausted(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -99,9 +96,197 @@ class TransactionMetadataTest {
   }
 
   @Test
+  def testTolerateUpdateTimeShiftDuringEpochBump(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Empty,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    val transitMetadata = txnMetadata.prepareIncrementProducerEpoch(30000, Option(producerEpoch), time.milliseconds() - 1).right.get
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
+    assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateUpdateTimeResetDuringProducerIdRotation(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Empty,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    val transitMetadata = txnMetadata.prepareProducerIdRotation(producerId + 1, 30000, time.milliseconds() - 1, recordLastEpoch = true)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(producerId + 1, txnMetadata.producerId)
+    assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+    assertEquals(0, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringAddPartitions(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Empty,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = time.milliseconds(),
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller; when transting from Empty the start time would be updated to the update-time
+    var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+
+    // add another partition, check that in Ongoing state the start timestamp would not change to update time
+    transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 2, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringPrepareCommit(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Ongoing,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareCommit, time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(PrepareCommit, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringPrepareAbort(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Ongoing,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(PrepareAbort, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringCompleteCommit(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = PrepareCommit,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(CompleteCommit, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringCompleteAbort(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = PrepareAbort,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(CompleteAbort, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
   def testFenceProducerAfterEpochsExhausted(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -131,8 +316,6 @@ class TransactionMetadataTest {
 
   @Test(expected = classOf[IllegalStateException])
   def testFenceProducerNotAllowedIfItWouldOverflow(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = Short.MaxValue
 
     val txnMetadata = new TransactionMetadata(
@@ -151,8 +334,6 @@ class TransactionMetadataTest {
 
   @Test
   def testRotateProducerId(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -192,8 +373,6 @@ class TransactionMetadataTest {
 
   @Test
   def testAttemptedEpochBumpWithNewlyCreatedMetadata(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -217,8 +396,6 @@ class TransactionMetadataTest {
 
   @Test
   def testEpochBumpWithCurrentEpochProvided(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -242,8 +419,6 @@ class TransactionMetadataTest {
 
   @Test
   def testAttemptedEpochBumpWithLastEpoch(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
     val lastProducerEpoch = (producerEpoch - 1).toShort
 
@@ -268,8 +443,6 @@ class TransactionMetadataTest {
 
   @Test
   def testAttemptedEpochBumpWithFencedEpoch(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
     val lastProducerEpoch = (producerEpoch - 1).toShort
 
@@ -290,8 +463,6 @@ class TransactionMetadataTest {
   }
 
   private def testRotateProducerIdInOngoingState(state: TransactionState): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(