You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/25 08:53:14 UTC

[pulsar] branch branch-2.10 updated: [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new f764350ff9b [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
f764350ff9b is described below

commit f764350ff9b54cd8b4312d1e6006d1339d7cbc8a
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Sep 14 11:24:57 2022 +0300

    [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
    
    (cherry picked from commit 8e0ae8055137ef3fedceccf3bec87ba2924e607d)
---
 .../metadata/bookkeeper/PulsarRegistrationClient.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index d27a3df1a2f..1207762046e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.metadata.bookkeeper;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,8 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.versioning.LongVersion;
@@ -53,6 +56,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
     private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
+    private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
@@ -68,11 +72,15 @@ public class PulsarRegistrationClient implements RegistrationClient {
         this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
 
+        this.executor = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
+
         store.registerListener(this::updatedBookies);
     }
 
     @Override
     public void close() {
+        executor.shutdownNow();
     }
 
     @Override
@@ -107,7 +115,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
         writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getWritableBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
     }
 
     @Override
@@ -119,7 +127,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
         readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getReadOnlyBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
     }
 
     @Override
@@ -132,11 +140,11 @@ public class PulsarRegistrationClient implements RegistrationClient {
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
                 getReadOnlyBookies().thenAccept(bookies ->
                         readOnlyBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
                 getWritableBookies().thenAccept(bookies ->
                         writableBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
             }
         }
     }