You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/02 04:42:06 UTC

[pulsar] 01/02: [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6db6679b419fc0951f42f43331e13cabe90c6130
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Aug 9 10:45:48 2022 +0800

    [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
    
    * [Fix][Flaky-test] Fix testConsumeTxnMessage
    Master https://github.com/apache/pulsar/issues/14109
    ## Motivation
    The transaction commit is async, so the consumer can still receive message when the consumer rebuilds.
    ## Modification
    Add  Awaitility.await() for check-ing whether the ongoingTxns = 0.
    
    (cherry picked from commit c29503e7c8704132f13ae8021a76735b065940b9)
---
 .../apache/pulsar/testclient/PerformanceTransactionTest.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index 936f70bedf3..1fcb925a5e2 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -194,7 +195,7 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
+    public void testConsumeTxnMessage() throws Exception {
         String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
         String subName = "sub";
         String topic = testTopic + UUID.randomUUID();
@@ -222,11 +223,18 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
         });
         thread.start();
         thread.join();
+
+        Awaitility.await().untilAsserted(() -> {
+            admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> {
+                Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
+            });
+        });
+
         Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionType(SubscriptionType.Exclusive)
                 .enableBatchIndexAcknowledgment(false)
-               .subscribe();
+                .subscribe();
         for (int i = 0; i < 5; i++) {
             Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);