You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/19 02:40:26 UTC

[pulsar] branch master updated: [fix][metadata] Handle session events in separate thread (#17638)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69f3f7471fa [fix][metadata] Handle session events in separate thread (#17638)
69f3f7471fa is described below

commit 69f3f7471fa6faf24d4776d65e0509538c105d37
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Sep 19 05:40:18 2022 +0300

    [fix][metadata] Handle session events in separate thread (#17638)
---
 .../pulsar/metadata/impl/AbstractMetadataStore.java | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 4b9ff914fcf..f7e90dc8e60 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -409,14 +409,19 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     protected void receivedSessionEvent(SessionEvent event) {
         isConnected = event.isConnected();
-
-        sessionListeners.forEach(l -> {
-            try {
-                l.accept(event);
-            } catch (Throwable t) {
-                log.warn("Error in processing session event", t);
-            }
-        });
+        try {
+            executor.execute(() -> {
+                sessionListeners.forEach(l -> {
+                    try {
+                        l.accept(event);
+                    } catch (Throwable t) {
+                        log.warn("Error in processing session event " + event, t);
+                    }
+                });
+            });
+        } catch (RejectedExecutionException e) {
+            log.warn("Error in processing session event " + event, e);
+        }
     }
 
     @Override