You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/05/11 14:40:39 UTC

[pulsar] branch branch-2.10 updated: [fix][test] Adjust flaky test concurrent consume reconnect (#15544)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ec2837a9873 [fix][test] Adjust flaky test concurrent consume reconnect (#15544)
ec2837a9873 is described below

commit ec2837a987388b20a71c41a35420892577c0b1a2
Author: ran <ga...@126.com>
AuthorDate: Wed May 11 20:41:32 2022 +0800

    [fix][test] Adjust flaky test concurrent consume reconnect (#15544)
    
    (cherry picked from commit 7070397649bc41fcd5e9070695ab2790228aa9c2)
---
 .../apache/pulsar/client/api/SimpleProducerConsumerTest.java | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index fa1505f4d6f..95324f856da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -819,10 +819,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
     // This is to test that the flow control counter doesn't get corrupted while concurrent receives during
     // reconnections
-    @Test(dataProvider = "batch", groups = "quarantine")
+    @Test(timeOut = 100_000, dataProvider = "batch", groups = "quarantine")
     public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
         final int recvQueueSize = 100;
         final int numConsumersThreads = 10;
+        final int receiveTimeoutSeconds = 100;
 
         String subName = UUID.randomUUID().toString();
         final Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -836,12 +837,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         for (int i = 0; i < numConsumersThreads; i++) {
             executor.submit((Callable<Void>) () -> {
                 barrier.await();
-                consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+                consumer.receive(receiveTimeoutSeconds, TimeUnit.SECONDS);
                 return null;
             });
         }
-
-        barrier.await();
+        barrier.await(); // the last thread reach barrier, start consume messages
 
         // we restart the broker to reconnect
         restartBroker();
@@ -877,7 +877,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
                 return null;
             });
         }
-        barrier.await();
+        barrier.await(); // the last thread reach barrier, start consume messages
 
         Awaitility.await().untilAsserted(() -> {
             // The available permits should be 20 and num messages in the queue should be 80
@@ -907,7 +907,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
                 return null;
             });
         }
-        barrier.await();
+        barrier.await(); // the last thread reach barrier, start consume messages
 
         restartBroker();