You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/05/11 14:40:39 UTC
[pulsar] branch branch-2.10 updated: [fix][test] Adjust flaky test concurrent consume reconnect (#15544)
This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ec2837a9873 [fix][test] Adjust flaky test concurrent consume reconnect (#15544)
ec2837a9873 is described below
commit ec2837a987388b20a71c41a35420892577c0b1a2
Author: ran <ga...@126.com>
AuthorDate: Wed May 11 20:41:32 2022 +0800
[fix][test] Adjust flaky test concurrent consume reconnect (#15544)
(cherry picked from commit 7070397649bc41fcd5e9070695ab2790228aa9c2)
---
.../apache/pulsar/client/api/SimpleProducerConsumerTest.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index fa1505f4d6f..95324f856da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -819,10 +819,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// This is to test that the flow control counter doesn't get corrupted while concurrent receives during
// reconnections
- @Test(dataProvider = "batch", groups = "quarantine")
+ @Test(timeOut = 100_000, dataProvider = "batch", groups = "quarantine")
public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
final int recvQueueSize = 100;
final int numConsumersThreads = 10;
+ final int receiveTimeoutSeconds = 100;
String subName = UUID.randomUUID().toString();
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -836,12 +837,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit((Callable<Void>) () -> {
barrier.await();
- consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ consumer.receive(receiveTimeoutSeconds, TimeUnit.SECONDS);
return null;
});
}
-
- barrier.await();
+ barrier.await(); // the last thread reach barrier, start consume messages
// we restart the broker to reconnect
restartBroker();
@@ -877,7 +877,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
return null;
});
}
- barrier.await();
+ barrier.await(); // the last thread reach barrier, start consume messages
Awaitility.await().untilAsserted(() -> {
// The available permits should be 20 and num messages in the queue should be 80
@@ -907,7 +907,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
return null;
});
}
- barrier.await();
+ barrier.await(); // the last thread reach barrier, start consume messages
restartBroker();