You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/06 08:17:18 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode. (#17237)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 390a4ed713f [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode. (#17237)
390a4ed713f is described below
commit 390a4ed713fed407a5ab95bdc4eb8e3f2d73be99
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Wed Aug 24 19:50:06 2022 +0800
[fix][broker] Fix dispatch duplicated messages with `Exclusive` mode. (#17237)
(cherry picked from commit 0517423b0a8d9c981cc5550abfec9e60b55bf3e7)
---
.../PersistentDispatcherSingleActiveConsumer.java | 71 ++++++++++--------
...entStreamingDispatcherSingleActiveConsumer.java | 54 +++++++++-----
.../SubscriptionMessageDispatchThrottlingTest.java | 86 ++++++++++++++++++++++
3 files changed, 161 insertions(+), 50 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 1f2636fbe4c..446a19de475 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -58,6 +59,7 @@ import org.slf4j.LoggerFactory;
public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
implements Dispatcher, ReadEntriesCallback {
+ private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final PersistentTopic topic;
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
@@ -242,14 +244,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
- if (newConsumer != null && !havePendingRead) {
- readMoreEntries(newConsumer);
- } else {
- log.debug(
- "[{}-{}] Ignoring write future complete."
- + " consumerAvailable={} havePendingRead={}",
- name, newConsumer, newConsumer != null, havePendingRead);
- }
+ readMoreEntries(newConsumer);
}
}));
}
@@ -336,25 +331,40 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
+ }
+ return;
+ }
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+ }
return;
}
if (consumer.getAvailablePermits() > 0) {
- Pair<Integer, Long> calculateResult = calculateToRead(consumer);
- int messagesToRead = calculateResult.getLeft();
- long bytesToRead = calculateResult.getRight();
+ synchronized (this) {
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+ }
+ return;
+ }
- if (-1 == messagesToRead || bytesToRead == -1) {
- // Skip read as topic/dispatcher has exceed the dispatch rate.
- return;
- }
+ Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+ int messagesToRead = calculateResult.getLeft();
+ long bytesToRead = calculateResult.getRight();
- // Schedule read
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
- }
+ if (-1 == messagesToRead || bytesToRead == -1) {
+ // Skip read as topic/dispatcher has exceed the dispatch rate.
+ return;
+ }
- synchronized (this) {
+ // Schedule read
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
+ }
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
@@ -375,19 +385,16 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
@Override
protected void reScheduleRead() {
- topic.getBrokerService().executor().schedule(() -> {
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- readMoreEntries(currentConsumer);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
- + " havePendingRead {}",
- topic.getName(), currentConsumer, havePendingRead);
- }
+ if (isRescheduleReadInProgress.compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
}
- }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-
+ topic.getBrokerService().executor().schedule(() -> {
+ isRescheduleReadInProgress.set(false);
+ Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ readMoreEntries(currentConsumer);
+ }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ }
}
protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 01ef7216d20..2048bb016b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -179,31 +179,49 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
+ }
+ return;
+ }
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+ }
return;
}
- if (!havePendingRead && consumer.getAvailablePermits() > 0) {
- Pair<Integer, Long> calculateResult = calculateToRead(consumer);
- int messagesToRead = calculateResult.getLeft();
- long bytesToRead = calculateResult.getRight();
+ if (consumer.getAvailablePermits() > 0) {
+ synchronized (this) {
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+ }
+ return;
+ }
+
+ Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+ int messagesToRead = calculateResult.getLeft();
+ long bytesToRead = calculateResult.getRight();
- if (-1 == messagesToRead || bytesToRead == -1) {
- // Skip read as topic/dispatcher has exceed the dispatch rate.
- return;
- }
+ if (-1 == messagesToRead || bytesToRead == -1) {
+ // Skip read as topic/dispatcher has exceed the dispatch rate.
+ return;
+ }
- // Schedule read
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
- }
- havePendingRead = true;
+ // Schedule read
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
+ }
+ havePendingRead = true;
- if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
- this, consumer);
- } else {
- streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
+ if (consumer.readCompacted()) {
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+ this, consumer);
+ } else {
+ streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
+ }
}
} else {
if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 9e65eecb8ee..3d1060a406c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -220,6 +220,92 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
log.info("-- Exiting {} test --", methodName);
}
+ @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+ int brokerRate = 1000;
+ int topicRate = 5000;
+ int subRate = 10000;
+ int expectRate = 1000;
+ final String namespace = "my-property/throttling_ns_non_dup";
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+ final String subName = "my-subscriber-name-" + subscription;
+
+ DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(subRate)
+ .ratePeriodInSecond(1)
+ .build();
+ DispatchRate topicDispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(topicRate)
+ .ratePeriodInSecond(1)
+ .build();
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+ admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+ final int numProducedMessages = 30;
+ final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+ final AtomicInteger totalReceived = new AtomicInteger(0);
+ // enable throttling for nonBacklog consumers
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener", receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+ DispatchRateLimiter subRateLimiter = null;
+ Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+ if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+ subRateLimiter = subDispatcher.getRateLimiter().get();
+ } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+ subRateLimiter = subDispatcher.getRateLimiter().get();
+ } else {
+ Assert.fail("Should only have PersistentDispatcher in this test");
+ }
+ final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+ Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+ Assert.assertTrue(brokerDispatchRateLimiter != null
+ && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+ Assert.assertTrue(topicDispatchRateLimiter != null
+ && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ Assert.assertTrue(subDispatchRateLimiter != null
+ && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ });
+
+ Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+ .getDispatchThrottlingRateInByte(), subRate);
+ Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+ .getDispatchThrottlingRateInByte(), topicRate);
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ producer.send(new byte[expectRate / 10]);
+ }
+
+ latch.await();
+ // Wait 2000 milli sec to check if we can get more than 30 messages.
+ Thread.sleep(2000);
+ // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+ Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, true);
+ admin.namespaces().deleteNamespace(namespace);
+ }
+
/**
* verify rate-limiting should throttle message-dispatching based on byte-rate
*