You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 11:36:30 UTC
[pulsar] 04/05: [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 433e3aff9b2cbdf690ee9de07dad12e5e738ac50
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Aug 9 10:45:48 2022 +0800
[Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
* [Fix][Flaky-test] Fix testConsumeTxnMessage
Master https://github.com/apache/pulsar/issues/14109
## Motivation
The transaction commit is async, so the consumer can still receive message when the consumer rebuilds.
## Modification
Add Awaitility.await() for check-ing whether the ongoingTxns = 0.
---
.../apache/pulsar/testclient/PerformanceTransactionTest.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index b87cf3adda3..883d53540cb 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -196,7 +197,7 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
}
@Test
- public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
+ public void testConsumeTxnMessage() throws Exception {
String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
String subName = "sub";
String topic = testTopic + UUID.randomUUID();
@@ -224,11 +225,18 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
});
thread.start();
thread.join();
+
+ Awaitility.await().untilAsserted(() -> {
+ admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> {
+ Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
+ });
+ });
+
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.enableBatchIndexAcknowledgment(false)
- .subscribe();
+ .subscribe();
for (int i = 0; i < 5; i++) {
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);