You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 03:30:04 UTC
[pulsar] branch master updated: [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eec46ddcba4 [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)
eec46ddcba4 is described below
commit eec46ddcba4d2b4f956e1b4d63154cc43087f507
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 11:29:57 2022 +0800
[improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)
### Motivation
Fix the CPU consumption while having many consumers (> 100k) and enabled dispatch rate limit.
![image](https://user-images.githubusercontent.com/12592133/175940861-7be13d62-042d-46b9-923d-3b1e8354d331.png)
[broker_perf.html.txt](https://github.com/apache/pulsar/files/8991916/broker_perf.html.txt)
### Modification
- Added `isRescheduleReadInProgress` to ensure the dispatcher only has one pending re-schedule read task at a time.
- Added DEBUG log for the re-schedule read operation
---
.../PersistentDispatcherMultipleConsumers.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5637b2d416c..6d6e8a72f00 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -107,6 +108,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
"blockedDispatcherOnUnackedMsgs");
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+ private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
+
protected enum ReadType {
Normal, Replay
}
@@ -294,8 +297,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
@Override
protected void reScheduleRead() {
- topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
- TimeUnit.MILLISECONDS);
+ if (isRescheduleReadInProgress.compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
+ }
+ topic.getBrokerService().executor().schedule(
+ () -> {
+ isRescheduleReadInProgress.set(false);
+ readMoreEntries();
+ },
+ MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ }
}
// left pair is messagesToRead, right pair is bytesToRead