You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/07 07:41:35 UTC
[pulsar] 01/01: Prevent `StackOverFlowException` in SHARED subscription.
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch fix_stack_over_flow
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ce22d830998a2cf137a75347e98d213e6e55d93
Author: mattison chao <ma...@sn-mac.local>
AuthorDate: Sun Aug 7 15:38:50 2022 +0800
Prevent `StackOverFlowException` in SHARED subscription.
---
.../PersistentDispatcherMultipleConsumers.java | 37 +++++++++++++---------
1 file changed, 22 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..c2ce6b83852 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
- protected boolean sendInProgress;
+ protected volatile boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
readMoreEntries();
}
+ /**
+ * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
+ *
+ */
+ public void readMoreEntiresAsync() {
+ topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ }
+
public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
- // We should not call readMoreEntries() recursively in the same thread
- // as there is a risk of StackOverflowError
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ readMoreEntiresAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
- sendInProgress = true;
- dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+ dispatchMessagesThread.execute(safeRun(() -> {
+ if (sendMessagesToConsumers(readType, entries)) {
+ readMoreEntries();
+ }
+ }));
} else {
- sendMessagesToConsumers(readType, entries);
+ if (sendMessagesToConsumers(readType, entries)) {
+ readMoreEntiresAsync();
+ };
}
}
- protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
- boolean readMoreEntries;
try {
- readMoreEntries = trySendMessagesToConsumers(readType, entries);
+ return trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
- if (readMoreEntries) {
- readMoreEntries();
- }
}
/**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ readMoreEntiresAsync();
}
int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ readMoreEntiresAsync();
}
}
// increment broker-level count