You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/07 07:41:35 UTC

[pulsar] 01/01: Prevent `StackOverFlowException` in SHARED subscription.

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch fix_stack_over_flow
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ce22d830998a2cf137a75347e98d213e6e55d93
Author: mattison chao <ma...@sn-mac.local>
AuthorDate: Sun Aug 7 15:38:50 2022 +0800

    Prevent `StackOverFlowException` in SHARED subscription.
---
 .../PersistentDispatcherMultipleConsumers.java     | 37 +++++++++++++---------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..c2ce6b83852 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-    protected boolean sendInProgress;
+    protected volatile boolean sendInProgress;
     protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         readMoreEntries();
     }
 
+    /**
+     * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
+     *
+     */
+    public void readMoreEntiresAsync() {
+        topic.getBrokerService().executor().execute(() -> readMoreEntries());
+    }
+
     public synchronized void readMoreEntries() {
         if (sendInProgress) {
             // we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                     havePendingReplayRead = false;
-                    // We should not call readMoreEntries() recursively in the same thread
-                    // as there is a risk of StackOverflowError
-                    topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                    readMoreEntiresAsync();
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
             // setting sendInProgress here, because sendMessagesToConsumers will be executed
             // in a separate thread, and we want to prevent more reads
-            sendInProgress = true;
-            dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+            dispatchMessagesThread.execute(safeRun(() -> {
+                if (sendMessagesToConsumers(readType, entries)) {
+                    readMoreEntries();
+                }
+            }));
         } else {
-            sendMessagesToConsumers(readType, entries);
+            if (sendMessagesToConsumers(readType, entries)) {
+                readMoreEntiresAsync();
+            };
         }
     }
 
-    protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         sendInProgress = true;
-        boolean readMoreEntries;
         try {
-            readMoreEntries = trySendMessagesToConsumers(readType, entries);
+            return trySendMessagesToConsumers(readType, entries);
         } finally {
             sendInProgress = false;
         }
-        if (readMoreEntries) {
-            readMoreEntries();
-        }
     }
 
     /**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
                 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
             log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
-            topic.getBrokerService().executor().execute(() -> readMoreEntries());
+            readMoreEntiresAsync();
         }
 
         int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                readMoreEntiresAsync();
             }
         }
         // increment broker-level count