You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/17 21:41:10 UTC
[kafka] branch 2.5 updated: KAFKA-8803: Remove timestamp check in
completeTransitionTo (#8278)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 9d68b8e KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
9d68b8e is described below
commit 9d68b8e3db2df135c799fc9523c99570d1ed6a26
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 17 14:40:02 2020 -0700
KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
In prepareAddPartitions the txnStartTimestamp could be updated as updateTimestamp, which is assumed to be always larger then the original startTimestamp. However, due to ntp time shift the timer may go backwards and hence the newStartTimestamp be smaller than the original one. Then later in completeTransitionTo the time check would fail with an IllegalStateException, and the txn would not transit to Ongoing.
An indirect result of this, is that this txn would NEVER be expired anymore because only Ongoing ones would be checked for expiration.
We should do the same as in #3286 to remove this check.
Also added test coverage for both KAFKA-5415 and KAFKA-8803.
Reviewers: Jason Gustafson<ja...@confluent.io>
---
.../transaction/TransactionMetadata.scala | 3 +-
.../transaction/TransactionMetadataTest.scala | 215 ++++++++++++++++++---
2 files changed, 194 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 4b57abf..24b418a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -378,8 +378,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
case Ongoing => // from addPartitions
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
- txnTimeoutMs != transitMetadata.txnTimeoutMs ||
- txnStartTimestamp > transitMetadata.txnStartTimestamp) {
+ txnTimeoutMs != transitMetadata.txnTimeoutMs) {
throwStateTransitionFailure(transitMetadata)
} else {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index 506e68d..85ee263 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -17,6 +17,7 @@
package kafka.coordinator.transaction
import kafka.utils.MockTime
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.junit.Assert._
@@ -27,11 +28,11 @@ import scala.collection.mutable
class TransactionMetadataTest {
val time = new MockTime()
+ val producerId = 23423L
+ val transactionalId = "txnlId"
@Test
def testInitializeEpoch(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = RecordBatch.NO_PRODUCER_EPOCH
val txnMetadata = new TransactionMetadata(
@@ -55,8 +56,6 @@ class TransactionMetadataTest {
@Test
def testNormalEpochBump(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@@ -79,8 +78,6 @@ class TransactionMetadataTest {
@Test(expected = classOf[IllegalStateException])
def testBumpEpochNotAllowedIfEpochsExhausted(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@@ -99,9 +96,197 @@ class TransactionMetadataTest {
}
@Test
+ def testTolerateUpdateTimeShiftDuringEpochBump(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Empty,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ val transitMetadata = txnMetadata.prepareIncrementProducerEpoch(30000, Option(producerEpoch), time.milliseconds() - 1).right.get
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
+ assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateUpdateTimeResetDuringProducerIdRotation(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Empty,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ val transitMetadata = txnMetadata.prepareProducerIdRotation(producerId + 1, 30000, time.milliseconds() - 1, recordLastEpoch = true)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(producerId + 1, txnMetadata.producerId)
+ assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+ assertEquals(0, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringAddPartitions(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Empty,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = time.milliseconds(),
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller; when transting from Empty the start time would be updated to the update-time
+ var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+
+ // add another partition, check that in Ongoing state the start timestamp would not change to update time
+ transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 2, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringPrepareCommit(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Ongoing,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareCommit, time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(PrepareCommit, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringPrepareAbort(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Ongoing,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(PrepareAbort, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringCompleteCommit(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = PrepareCommit,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(CompleteCommit, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringCompleteAbort(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = PrepareAbort,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(CompleteAbort, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
def testFenceProducerAfterEpochsExhausted(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@@ -131,8 +316,6 @@ class TransactionMetadataTest {
@Test(expected = classOf[IllegalStateException])
def testFenceProducerNotAllowedIfItWouldOverflow(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = Short.MaxValue
val txnMetadata = new TransactionMetadata(
@@ -151,8 +334,6 @@ class TransactionMetadataTest {
@Test
def testRotateProducerId(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@@ -192,8 +373,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithNewlyCreatedMetadata(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@@ -217,8 +396,6 @@ class TransactionMetadataTest {
@Test
def testEpochBumpWithCurrentEpochProvided(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@@ -242,8 +419,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithLastEpoch(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val lastProducerEpoch = (producerEpoch - 1).toShort
@@ -268,8 +443,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithFencedEpoch(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val lastProducerEpoch = (producerEpoch - 1).toShort
@@ -290,8 +463,6 @@ class TransactionMetadataTest {
}
private def testRotateProducerIdInOngoingState(state: TransactionState): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(