You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/11 22:27:39 UTC
[pulsar] branch branch-2.8 updated: fix flaky in step2 and step3 (#12954)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 06219e2 fix flaky in step2 and step3 (#12954)
06219e2 is described below
commit 06219e2261f345fb16a7a4685e79532f0ccee97b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Nov 24 15:31:39 2021 +0800
fix flaky in step2 and step3 (#12954)
(cherry picked from commit 94736a43f1b9a6d1db75936032b94eb9a11b9c0d)
---
.../client/api/DispatcherBlockConsumerTest.java | 27 ++++++++++------------
1 file changed, 12 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d43a759..00f52ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -25,7 +25,11 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.List;
@@ -41,8 +45,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-
-
import lombok.Cleanup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
@@ -50,8 +52,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,12 +63,6 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-
@Test(groups = "flaky")
public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);
@@ -688,6 +684,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
try {
+ final int waitMills = 500;
final int maxUnAckPerBroker = 200;
final double unAckMsgPercentagePerDispatcher = 10;
int maxUnAckPerDispatcher = (int) ((maxUnAckPerBroker * unAckMsgPercentagePerDispatcher) / 100); // 200 *
@@ -745,7 +742,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
Message<byte[]> msg = null;
Set<MessageId> messages1 = Sets.newHashSet();
for (int j = 0; j < totalProducedMsgs; j++) {
- msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS);
+ msg = consumer1Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
messages1.add(msg.getMessageId());
} else {
@@ -754,7 +751,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the
// subscription
if (j == maxUnAckPerBroker) {
- Thread.sleep(200);
+ Thread.sleep(waitMills);
}
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
@@ -767,7 +764,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumer2Msgs = 0;
for (int j = 0; j < totalProducedMsgs; j++) {
- msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS);
+ msg = consumer2Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
consumer2Msgs++;
} else {
@@ -792,7 +789,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Set<MessageId> messages2 = Sets.newHashSet();
for (int j = 0; j < totalProducedMsgs; j++) {
- msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS);
+ msg = consumerSub2.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
messages2.add(msg.getMessageId());
} else {
@@ -809,7 +806,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumedMsgsSub3 = 0;
for (int j = 0; j < totalProducedMsgs; j++) {
- msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS);
+ msg = consumer1Sub3.receive();
if (msg != null) {
consumedMsgsSub3++;
consumer1Sub3.acknowledge(msg);