You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/26 08:20:21 UTC

[GitHub] [pulsar] lhotari commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

lhotari commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955786926


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   This is flaky. since the comment is "please alert", I'm alerting. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org