You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:32 UTC

[pulsar] 27/29: [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e83c26efcfce5d0d10464a281240feb3e30ab8a2
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 11:29:57 2022 +0800

    [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)
    
    ### Motivation
    
    Fix the CPU consumption while having many consumers (> 100k) and enabled dispatch rate limit.
    
    ![image](https://user-images.githubusercontent.com/12592133/175940861-7be13d62-042d-46b9-923d-3b1e8354d331.png)
    
    [broker_perf.html.txt](https://github.com/apache/pulsar/files/8991916/broker_perf.html.txt)
    
    ### Modification
    
    - Added `isRescheduleReadInProgress` to ensure the dispatcher only has one pending re-schedule read task at a time.
    - Added DEBUG log for the re-schedule read operation
    
    (cherry picked from commit eec46ddcba4d2b4f956e1b4d63154cc43087f507)
---
 .../PersistentDispatcherMultipleConsumers.java           | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0a6cf8e02a9..f77a55338f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -105,6 +106,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     "blockedDispatcherOnUnackedMsgs");
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
+    private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
+
     protected enum ReadType {
         Normal, Replay
     }
@@ -290,8 +293,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     @Override
     protected void reScheduleRead() {
-        topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                TimeUnit.MILLISECONDS);
+        if (isRescheduleReadInProgress.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
+            }
+            topic.getBrokerService().executor().schedule(
+                    () -> {
+                        isRescheduleReadInProgress.set(false);
+                        readMoreEntries();
+                        },
+                    MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+        }
     }
 
     // left pair is messagesToRead, right pair is bytesToRead