You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/08 14:16:57 UTC
[pulsar] branch branch-2.11 updated: Use `safeRun` to log thread exception. (#17484)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 8c148684fbc Use `safeRun` to log thread exception. (#17484)
8c148684fbc is described below
commit 8c148684fbc1e1a5d0fc1cf73873f79c0bd6323b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Sep 8 21:32:21 2022 +0800
Use `safeRun` to log thread exception. (#17484)
---
.../persistent/PersistentDispatcherMultipleConsumers.java | 6 +++---
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 11 ++++++-----
.../PersistentStreamingDispatcherMultipleConsumers.java | 2 +-
3 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 08257e909a6..0e68186490f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -221,9 +221,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
- topic.getBrokerService().executor().execute(() -> {
+ topic.getBrokerService().executor().execute(safeRun(() -> {
internalConsumerFlow(consumer, additionalNumberOfMessages);
- });
+ }));
}
private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
@@ -249,7 +249,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
*
*/
public void readMoreEntriesAsync() {
- topic.getBrokerService().executor().execute(this::readMoreEntries);
+ topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
}
public synchronized void readMoreEntries() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index e42995e9247..5573596a96e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
@@ -330,14 +331,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
} else if (currentThreadKeyNumber == 0) {
sendInProgress = false;
- topic.getBrokerService().executor().schedule(() -> {
+ topic.getBrokerService().executor().schedule(safeRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
readMoreEntries();
}
- }, 100, TimeUnit.MILLISECONDS);
+ }), 100, TimeUnit.MILLISECONDS);
}
return false;
}
@@ -411,7 +412,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
public void markDeletePositionMoveForward() {
// Execute the notification in different thread to avoid a mutex chain here
// from the delete operation that was completed
- topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
+ topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
&& removeConsumersFromRecentJoinedConsumers()) {
@@ -420,7 +421,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
readMoreEntries();
}
}
- });
+ }));
}
private boolean removeConsumersFromRecentJoinedConsumers() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 28e3ecd11b4..9eb0a169964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -192,7 +192,7 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
- topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,