You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/07 07:41:34 UTC

[pulsar] branch fix_stack_over_flow created (now 5ce22d83099)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a change to branch fix_stack_over_flow
in repository https://gitbox.apache.org/repos/asf/pulsar.git


      at 5ce22d83099 Prevent `StackOverFlowException` in SHARED subscription.

This branch includes the following new commits:

     new 5ce22d83099 Prevent `StackOverFlowException` in SHARED subscription.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar] 01/01: Prevent `StackOverFlowException` in SHARED subscription.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch fix_stack_over_flow
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ce22d830998a2cf137a75347e98d213e6e55d93
Author: mattison chao <ma...@sn-mac.local>
AuthorDate: Sun Aug 7 15:38:50 2022 +0800

    Prevent `StackOverFlowException` in SHARED subscription.
---
 .../PersistentDispatcherMultipleConsumers.java     | 37 +++++++++++++---------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..c2ce6b83852 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-    protected boolean sendInProgress;
+    protected volatile boolean sendInProgress;
     protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         readMoreEntries();
     }
 
+    /**
+     * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
+     *
+     */
+    public void readMoreEntiresAsync() {
+        topic.getBrokerService().executor().execute(() -> readMoreEntries());
+    }
+
     public synchronized void readMoreEntries() {
         if (sendInProgress) {
             // we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                     havePendingReplayRead = false;
-                    // We should not call readMoreEntries() recursively in the same thread
-                    // as there is a risk of StackOverflowError
-                    topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                    readMoreEntiresAsync();
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
             // setting sendInProgress here, because sendMessagesToConsumers will be executed
             // in a separate thread, and we want to prevent more reads
-            sendInProgress = true;
-            dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+            dispatchMessagesThread.execute(safeRun(() -> {
+                if (sendMessagesToConsumers(readType, entries)) {
+                    readMoreEntries();
+                }
+            }));
         } else {
-            sendMessagesToConsumers(readType, entries);
+            if (sendMessagesToConsumers(readType, entries)) {
+                readMoreEntiresAsync();
+            };
         }
     }
 
-    protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         sendInProgress = true;
-        boolean readMoreEntries;
         try {
-            readMoreEntries = trySendMessagesToConsumers(readType, entries);
+            return trySendMessagesToConsumers(readType, entries);
         } finally {
             sendInProgress = false;
         }
-        if (readMoreEntries) {
-            readMoreEntries();
-        }
     }
 
     /**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
                 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
             log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
-            topic.getBrokerService().executor().execute(() -> readMoreEntries());
+            readMoreEntiresAsync();
         }
 
         int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                readMoreEntiresAsync();
             }
         }
         // increment broker-level count