You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/28 09:56:25 UTC
[pulsar] branch master updated: [fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b7b923933a5 [fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)
b7b923933a5 is described below
commit b7b923933a5c7594a52c8bb761f80e25ca0065cb
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Oct 28 17:56:16 2022 +0800
[fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)
---
.../org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d6a7f3b134d..ec24c066f36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -618,12 +618,13 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
latch.await();
// (2) consume all messages except: unackMessages-set
- Set<Integer> unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
+ Set<Integer> unackMsgIndex = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
+ Set<String> unackMsgs = unackMsgIndex.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
int receivedMsgCount = 0;
for (int i = 0; i < totalProducedMsgs; i++) {
Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
assertNotNull(msg);
- if (!unackMessages.contains(i)) {
+ if (!unackMsgs.contains(new String(msg.getData()))) {
consumer.acknowledge(msg);
}
receivedMsgCount++;
@@ -646,7 +647,6 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
.subscriptionType(SubscriptionType.Shared).subscribe();
// consumer should only receive unakced messages
- Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
Set<String> receivedMsgs = new HashSet<>();
for (int i = 0; i < totalProducedMsgs; i++) {
Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);