You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/08 14:16:57 UTC

[pulsar] branch branch-2.11 updated: Use `safeRun` to log thread exception. (#17484)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8c148684fbc Use `safeRun` to log thread exception. (#17484)
8c148684fbc is described below

commit 8c148684fbc1e1a5d0fc1cf73873f79c0bd6323b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Sep 8 21:32:21 2022 +0800

    Use `safeRun` to log thread exception. (#17484)
---
 .../persistent/PersistentDispatcherMultipleConsumers.java     |  6 +++---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java       | 11 ++++++-----
 .../PersistentStreamingDispatcherMultipleConsumers.java       |  2 +-
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 08257e909a6..0e68186490f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -221,9 +221,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     @Override
     public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
-        topic.getBrokerService().executor().execute(() -> {
+        topic.getBrokerService().executor().execute(safeRun(() -> {
             internalConsumerFlow(consumer, additionalNumberOfMessages);
-        });
+        }));
     }
 
     private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
@@ -249,7 +249,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
      *
      */
     public void readMoreEntriesAsync() {
-        topic.getBrokerService().executor().execute(this::readMoreEntries);
+        topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
     }
 
     public synchronized void readMoreEntries() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index e42995e9247..5573596a96e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -330,14 +331,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             // readMoreEntries should run regardless whether or not stuck is caused by
             // stuckConsumers for avoid stopping dispatch.
             sendInProgress = false;
-            topic.getBrokerService().executor().execute(() -> readMoreEntries());
+            topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
         }  else if (currentThreadKeyNumber == 0) {
             sendInProgress = false;
-            topic.getBrokerService().executor().schedule(() -> {
+            topic.getBrokerService().executor().schedule(safeRun(() -> {
                 synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
                     readMoreEntries();
                 }
-            }, 100, TimeUnit.MILLISECONDS);
+            }), 100, TimeUnit.MILLISECONDS);
         }
         return false;
     }
@@ -411,7 +412,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     public void markDeletePositionMoveForward() {
         // Execute the notification in different thread to avoid a mutex chain here
         // from the delete operation that was completed
-        topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
+        topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
                 if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
                         && removeConsumersFromRecentJoinedConsumers()) {
@@ -420,7 +421,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                     readMoreEntries();
                 }
             }
-        });
+        }));
     }
 
     private boolean removeConsumersFromRecentJoinedConsumers() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 28e3ecd11b4..9eb0a169964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -192,7 +192,7 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
                     havePendingReplayRead = false;
                     // We should not call readMoreEntries() recursively in the same thread
                     // as there is a risk of StackOverflowError
-                    topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                    topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,