You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/11 22:27:39 UTC

[pulsar] branch branch-2.8 updated: fix flaky in step2 and step3 (#12954)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 06219e2  fix flaky in step2 and step3 (#12954)
06219e2 is described below

commit 06219e2261f345fb16a7a4685e79532f0ccee97b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Nov 24 15:31:39 2021 +0800

    fix flaky in step2 and step3 (#12954)
    
    (cherry picked from commit 94736a43f1b9a6d1db75936032b94eb9a11b9c0d)
---
 .../client/api/DispatcherBlockConsumerTest.java    | 27 ++++++++++------------
 1 file changed, 12 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d43a759..00f52ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -25,7 +25,11 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.Iterator;
 import java.util.List;
@@ -41,8 +45,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-
-
 import lombok.Cleanup;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -50,8 +52,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,12 +63,6 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-
 @Test(groups = "flaky")
 public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);
@@ -688,6 +684,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
         ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
 
         try {
+            final int waitMills = 500;
             final int maxUnAckPerBroker = 200;
             final double unAckMsgPercentagePerDispatcher = 10;
             int maxUnAckPerDispatcher = (int) ((maxUnAckPerBroker * unAckMsgPercentagePerDispatcher) / 100); // 200 *
@@ -745,7 +742,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             Message<byte[]> msg = null;
             Set<MessageId> messages1 = Sets.newHashSet();
             for (int j = 0; j < totalProducedMsgs; j++) {
-                msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS);
+                msg = consumer1Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
                 if (msg != null) {
                     messages1.add(msg.getMessageId());
                 } else {
@@ -754,7 +751,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                 // once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the
                 // subscription
                 if (j == maxUnAckPerBroker) {
-                    Thread.sleep(200);
+                    Thread.sleep(waitMills);
                 }
             }
             // client must receive number of messages = maxUnAckPerbroker rather all produced messages
@@ -767,7 +764,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                     .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
             int consumer2Msgs = 0;
             for (int j = 0; j < totalProducedMsgs; j++) {
-                msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS);
+                msg = consumer2Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
                 if (msg != null) {
                     consumer2Msgs++;
                 } else {
@@ -792,7 +789,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                     .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
             Set<MessageId> messages2 = Sets.newHashSet();
             for (int j = 0; j < totalProducedMsgs; j++) {
-                msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS);
+                msg = consumerSub2.receive(waitMills, TimeUnit.MILLISECONDS);
                 if (msg != null) {
                     messages2.add(msg.getMessageId());
                 } else {
@@ -809,7 +806,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                     .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
             int consumedMsgsSub3 = 0;
             for (int j = 0; j < totalProducedMsgs; j++) {
-                msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS);
+                msg = consumer1Sub3.receive();
                 if (msg != null) {
                     consumedMsgsSub3++;
                     consumer1Sub3.acknowledge(msg);