You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:28:48 UTC

[pulsar] 01/02: [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b121d854c0933309475891774b7a90911afb2c81
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Sep 14 11:24:57 2022 +0300

    [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
---
 .../metadata/bookkeeper/PulsarRegistrationClient.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 38e2a33ef3f..52b50e3ea4b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -21,12 +21,15 @@ package org.apache.pulsar.metadata.bookkeeper;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.versioning.Version;
@@ -46,6 +49,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
+    private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
@@ -60,11 +64,15 @@ public class PulsarRegistrationClient implements RegistrationClient {
         this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
 
+        this.executor = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
+
         store.registerListener(this::updatedBookies);
     }
 
     @Override
     public void close() {
+        executor.shutdownNow();
     }
 
     @Override
@@ -99,7 +107,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
         writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getWritableBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
     }
 
     @Override
@@ -111,7 +119,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
         readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getReadOnlyBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
     }
 
     @Override
@@ -124,11 +132,11 @@ public class PulsarRegistrationClient implements RegistrationClient {
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
                 getReadOnlyBookies().thenAccept(bookies ->
                         readOnlyBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
                 getWritableBookies().thenAccept(bookies ->
                         writableBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
             }
         }
     }