You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/22 05:12:19 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest

showuon commented on code in PR #13036:
URL: https://github.com/apache/kafka/pull/13036#discussion_r1055097820


##########
core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala:
##########
@@ -128,10 +128,13 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
     producer.flush()
 
+    TestUtils.waitUntilTrue(() => producerState.nonEmpty, s"Producer IDs for topic1 did not propagate quickly")
+
     // Ensure producer IDs are added.
     val pState = producerState
-    assertEquals(1, pState.size)
-    val oldProducerId = pState(0).producerId
+    assertEquals(1, pState.size, s"No producer visible via admin api")

Review Comment:
   I don't think the error message makes sense here since we already wait until the `producerState` is not empty. The error here could be the size is different, not no producer visible via admin api. Does that make sense?



##########
core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala:
##########
@@ -149,13 +152,18 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true))
     producer.commitTransaction()
 
+    TestUtils.waitUntilTrue(() => producerState.nonEmpty, s"Producer IDs for topic1 did not propagate quickly")
+
     // Producer IDs should repopulate.
     val pState2 = producerState
-    assertEquals(1, pState2.size)
-    val newProducerId = pState2(0).producerId
+    assertEquals(1, pState2.size, "No producer visible via admin api")
+    val newProducerId = pState2.head.producerId
+    val newProducerEpoch = pState2.head.producerEpoch
 
-    // Producer IDs should be the same.
+    // Because the transaction IDs outlive the producer IDs, creating a producer with the same transactional id
+    // soon after the first will re-use the same producerId, while bumping the epoch to indicate that they are distinct.
     assertEquals(oldProducerId, newProducerId)
+    assertEquals(oldProducerEpoch + 1, newProducerEpoch)

Review Comment:
   Thanks for adding one more verification and the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org