You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/04/06 06:55:17 UTC

[kafka] branch 3.3 updated: KAFKA-14880; TransactionMetadata with producer epoch -1 should be expirable (#13499)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 7f242fee15d KAFKA-14880; TransactionMetadata with producer epoch -1 should be expirable (#13499)
7f242fee15d is described below

commit 7f242fee15d1121c581cb18bbbb29fb746d3f350
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Apr 6 08:45:16 2023 +0200

    KAFKA-14880; TransactionMetadata with producer epoch -1 should be expirable (#13499)
    
    We have seen the following error in logs:
    
    ```
    "Mar 22, 2019 @ 21:57:56.655",Error,"kafka-0-0","transaction-log-manager-0","Uncaught exception in scheduled task 'transactionalId-expiration'","java.lang.IllegalArgumentException: Illegal new producer epoch -1
    ```
    
    Investigations showed that it is actually possible for a transaction metadata object to still have -1 as producer epoch when it transitions to Dead.
    
    When a transaction metadata is created for the first time (in handleInitProducerId), it has -1 as its producer epoch. Then a producer epoch is attributed and the transaction coordinator tries to persist the change. If the write fail for instance because there is an under min isr, the transaction metadata remains with its epoch as -1 forever or until the init producer id is retried.
    
    This means that it is possible for transaction metadata to remain with -1 as producer epoch until it gets expired. At the moment, this is not allowed because we enforce a producer epoch greater or equals to 0 in prepareTransitionTo.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Justine Olshan <jo...@confluent.io>
---
 .../transaction/TransactionMetadata.scala          |  5 ++-
 .../transaction/TransactionStateManagerTest.scala  | 49 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 0f6d4b78adf..dc208081d10 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -382,7 +382,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
     if (newProducerId < 0)
       throw new IllegalArgumentException(s"Illegal new producer id $newProducerId")
 
-    if (newEpoch < 0)
+    // The epoch is initialized to NO_PRODUCER_EPOCH when the TransactionMetadata
+    // is created for the first time and it could stay like this until transitioning
+    // to Dead.
+    if (newState != Dead && newEpoch < 0)
       throw new IllegalArgumentException(s"Illegal new producer epoch $newEpoch")
 
     // check that the new state transition is valid and update the pending state if necessary
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 6a56b768c47..24ad8d6550a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -750,6 +750,55 @@ class TransactionStateManagerTest {
     assertEquals(allTransactionalIds, expiredTransactionalIds)
   }
 
+  @Test
+  def testTransactionExpirationShouldNotFailWithUninitializedTransactionMetadata(): Unit = {
+    val partitionIds = 0 until numPartitions
+    val maxBatchSize = 512
+    val transactionalId = "id"
+    val allTransactionalIds = Set(transactionalId)
+
+    loadTransactionsForPartitions(partitionIds)
+
+    // When TransactionMetadata is intialized for the first time, it has the following
+    // shape. Then, the producer id and producer epoch are initialized and we try to
+    // write the change. If the write fails (e.g. under min isr), the TransactionMetadata
+    // is left at it is. If the transactional id is never reused, the TransactionMetadata
+    // will be expired and it should succeed.
+    val txnMetadata = TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = 1,
+      producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = transactionTimeoutMs,
+      state = Empty,
+      timestamp = time.milliseconds()
+    )
+    transactionManager.putTransactionStateIfNotExists(txnMetadata)
+
+    time.sleep(txnConfig.transactionalIdExpirationMs + 1)
+
+    reset(replicaManager)
+    expectLogConfig(partitionIds, maxBatchSize)
+
+    val appendedRecords = mutable.Map.empty[TopicPartition, mutable.Buffer[MemoryRecords]]
+    expectTransactionalIdExpiration(Errors.NONE, appendedRecords)
+
+    transactionManager.removeExpiredTransactionalIds()
+    verify(replicaManager, atLeastOnce()).appendRecords(
+      anyLong(),
+      ArgumentMatchers.eq((-1).toShort),
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(AppendOrigin.Coordinator),
+      any(),
+      any(),
+      any[Option[ReentrantLock]],
+      any(),
+      any()
+    )
+
+    val expiredTransactionalIds = collectTransactionalIdsFromTombstones(appendedRecords)
+    assertEquals(allTransactionalIds, expiredTransactionalIds)
+  }
+
   private def collectTransactionalIdsFromTombstones(
     appendedRecords: mutable.Map[TopicPartition, mutable.Buffer[MemoryRecords]]
   ): Set[String] = {