You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/06 08:17:18 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode. (#17237)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 390a4ed713f [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode. (#17237)
390a4ed713f is described below

commit 390a4ed713fed407a5ab95bdc4eb8e3f2d73be99
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Wed Aug 24 19:50:06 2022 +0800

    [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode. (#17237)
    
    (cherry picked from commit 0517423b0a8d9c981cc5550abfec9e60b55bf3e7)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 71 ++++++++++--------
 ...entStreamingDispatcherSingleActiveConsumer.java | 54 +++++++++-----
 .../SubscriptionMessageDispatchThrottlingTest.java | 86 ++++++++++++++++++++++
 3 files changed, 161 insertions(+), 50 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 1f2636fbe4c..446a19de475 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -58,6 +59,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
         implements Dispatcher, ReadEntriesCallback {
 
+    private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
     protected final PersistentTopic topic;
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
@@ -242,14 +244,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                         SafeRun.safeRun(() -> {
                             synchronized (PersistentDispatcherSingleActiveConsumer.this) {
                                 Consumer newConsumer = getActiveConsumer();
-                                if (newConsumer != null && !havePendingRead) {
-                                    readMoreEntries(newConsumer);
-                                } else {
-                                    log.debug(
-                                            "[{}-{}] Ignoring write future complete."
-                                                    + " consumerAvailable={} havePendingRead={}",
-                                            name, newConsumer, newConsumer != null, havePendingRead);
-                                }
+                                readMoreEntries(newConsumer);
                             }
                         }));
                 }
@@ -336,25 +331,40 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         // consumer can be null when all consumers are disconnected from broker.
         // so skip reading more entries if currently there is no active consumer.
         if (null == consumer) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
+            }
+            return;
+        }
+        if (havePendingRead) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+            }
             return;
         }
 
         if (consumer.getAvailablePermits() > 0) {
-            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
-            int messagesToRead = calculateResult.getLeft();
-            long bytesToRead = calculateResult.getRight();
+            synchronized (this) {
+                if (havePendingRead) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+                    }
+                    return;
+                }
 
-            if (-1 == messagesToRead || bytesToRead == -1) {
-                // Skip read as topic/dispatcher has exceed the dispatch rate.
-                return;
-            }
+                Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+                int messagesToRead = calculateResult.getLeft();
+                long bytesToRead = calculateResult.getRight();
 
-            // Schedule read
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
-            }
+                if (-1 == messagesToRead || bytesToRead == -1) {
+                    // Skip read as topic/dispatcher has exceed the dispatch rate.
+                    return;
+                }
 
-            synchronized (this) {
+                // Schedule read
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
+                }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
                     topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
@@ -375,19 +385,16 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
 
     @Override
     protected void reScheduleRead() {
-        topic.getBrokerService().executor().schedule(() -> {
-            Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-            if (currentConsumer != null && !havePendingRead) {
-                readMoreEntries(currentConsumer);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
-                                    + " havePendingRead {}",
-                            topic.getName(), currentConsumer, havePendingRead);
-                }
+        if (isRescheduleReadInProgress.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
             }
-        }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-
+            topic.getBrokerService().executor().schedule(() -> {
+                isRescheduleReadInProgress.set(false);
+                Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                readMoreEntries(currentConsumer);
+            }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+        }
     }
 
     protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 01ef7216d20..2048bb016b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -179,31 +179,49 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
         // consumer can be null when all consumers are disconnected from broker.
         // so skip reading more entries if currently there is no active consumer.
         if (null == consumer) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
+            }
+            return;
+        }
+        if (havePendingRead) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+            }
             return;
         }
 
-        if (!havePendingRead && consumer.getAvailablePermits() > 0) {
-            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
-            int messagesToRead = calculateResult.getLeft();
-            long bytesToRead = calculateResult.getRight();
+        if (consumer.getAvailablePermits() > 0) {
+            synchronized (this) {
+                if (havePendingRead) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+                    }
+                    return;
+                }
+
+                Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+                int messagesToRead = calculateResult.getLeft();
+                long bytesToRead = calculateResult.getRight();
 
 
-            if (-1 == messagesToRead || bytesToRead == -1) {
-                // Skip read as topic/dispatcher has exceed the dispatch rate.
-                return;
-            }
+                if (-1 == messagesToRead || bytesToRead == -1) {
+                    // Skip read as topic/dispatcher has exceed the dispatch rate.
+                    return;
+                }
 
-            // Schedule read
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
-            }
-            havePendingRead = true;
+                // Schedule read
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
+                }
+                havePendingRead = true;
 
-            if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
-                        this, consumer);
-            } else {
-                streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
+                if (consumer.readCompacted()) {
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+                            this, consumer);
+                } else {
+                    streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
+                }
             }
         } else {
             if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 9e65eecb8ee..3d1060a406c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -220,6 +220,92 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, true);
+        admin.namespaces().deleteNamespace(namespace);
+    }
+
     /**
      * verify rate-limiting should throttle message-dispatching based on byte-rate
      *