You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/06/07 19:52:24 UTC

[kafka] branch 2.3 updated: KAFKA-8003; Fix flaky testFencingOnTransactionExpiration

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f56a103  KAFKA-8003; Fix flaky testFencingOnTransactionExpiration
f56a103 is described below

commit f56a10322a526205846017fee5ee39682efcbb3c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Jun 7 12:51:51 2019 -0700

    KAFKA-8003; Fix flaky testFencingOnTransactionExpiration
    
    We see this failure from time to time:
    ```
    java.lang.AssertionError: expected:<1> but was:<0>
    	at org.junit.Assert.fail(Assert.java:89)
    	at org.junit.Assert.failNotEquals(Assert.java:835)
    	at org.junit.Assert.assertEquals(Assert.java:647)
    	at org.junit.Assert.assertEquals(Assert.java:633)
    	at kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:512)
    ```
    The cause is probably that we are using `consumeRecordsFor` which has no expectation on the number of records to fetch and a timeout of just 1s. This patch changes the code to use `consumeRecords` and the default 15s timeout.
    
    Note we have also fixed a bug in the test case itself, which was using the wrong topic for the second write, which meant it could never have failed in the anticipated way anyway.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6905 from hachikuji/fix-flaky-transaction-test
    
    (cherry picked from commit bb8de0b8c5f98f7a9d6b5ae7342ba7a0e1af8868)
    Signed-off-by: Gwen Shapira <cs...@gmail.com>
---
 .../test/scala/integration/kafka/api/TransactionsTest.scala    | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 375adaa..13ddd92 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -497,7 +497,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     try {
       // Now that the transaction has expired, the second send should fail with a ProducerFencedException.
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false)).get()
+      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "2", willBeCommitted = false)).get()
       fail("should have raised a ProducerFencedException since the transaction has expired")
     } catch {
       case _: ProducerFencedException =>
@@ -506,9 +506,13 @@ class TransactionsTest extends KafkaServerTestHarness {
     }
 
     // Verify that the first message was aborted and the second one was never written at all.
-    val nonTransactionalConsumer = nonTransactionalConsumers(0)
+    val nonTransactionalConsumer = nonTransactionalConsumers.head
     nonTransactionalConsumer.subscribe(List(topic1).asJava)
-    val records = TestUtils.consumeRecordsFor(nonTransactionalConsumer, 1000)
+
+    // Attempt to consume the one written record. We should not see the second. The
+    // assertion does not strictly guarantee that the record wasn't written, but the
+    // data is small enough that had it been written, it would have been in the first fetch.
+    val records = TestUtils.consumeRecords(nonTransactionalConsumer, numRecords = 1)
     assertEquals(1, records.size)
     assertEquals("1", TestUtils.recordValueAsString(records.head))