You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 11:29:43 UTC
[pulsar] 03/04: [fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 436ee4bbb0e6a09879a3e09a7018dc05a75c3826
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Aug 9 01:30:46 2022 +0800
[fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)
---
.../PersistentDispatcherMultipleConsumers.java | 37 +++++++++++++---------
1 file changed, 22 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..4887f6b0541 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
- protected boolean sendInProgress;
+ protected volatile boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
readMoreEntries();
}
+ /**
+ * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
+ *
+ */
+ public void readMoreEntiresAsync() {
+ topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ }
+
public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
- // We should not call readMoreEntries() recursively in the same thread
- // as there is a risk of StackOverflowError
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ readMoreEntiresAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
- sendInProgress = true;
- dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+ dispatchMessagesThread.execute(safeRun(() -> {
+ if (sendMessagesToConsumers(readType, entries)) {
+ readMoreEntries();
+ }
+ }));
} else {
- sendMessagesToConsumers(readType, entries);
+ if (sendMessagesToConsumers(readType, entries)) {
+ readMoreEntiresAsync();
+ }
}
}
- protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
- boolean readMoreEntries;
try {
- readMoreEntries = trySendMessagesToConsumers(readType, entries);
+ return trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
- if (readMoreEntries) {
- readMoreEntries();
- }
}
/**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ readMoreEntiresAsync();
}
int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ readMoreEntiresAsync();
}
}
// increment broker-level count