You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/28 09:56:25 UTC

[pulsar] branch master updated: [fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7b923933a5 [fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)
b7b923933a5 is described below

commit b7b923933a5c7594a52c8bb761f80e25ca0065cb
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Oct 28 17:56:16 2022 +0800

    [fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)
---
 .../org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d6a7f3b134d..ec24c066f36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -618,12 +618,13 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
         }
         latch.await();
         // (2) consume all messages except: unackMessages-set
-        Set<Integer> unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
+        Set<Integer> unackMsgIndex = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
+        Set<String> unackMsgs = unackMsgIndex.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
         int receivedMsgCount = 0;
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
-            if (!unackMessages.contains(i)) {
+            if (!unackMsgs.contains(new String(msg.getData()))) {
                 consumer.acknowledge(msg);
             }
             receivedMsgCount++;
@@ -646,7 +647,6 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                 .subscriptionType(SubscriptionType.Shared).subscribe();
 
         // consumer should only receive unakced messages
-        Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
         Set<String> receivedMsgs = new HashSet<>();
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);