You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:28:48 UTC
[pulsar] 01/02: [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b121d854c0933309475891774b7a90911afb2c81
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Sep 14 11:24:57 2022 +0300
[fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
---
.../metadata/bookkeeper/PulsarRegistrationClient.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 38e2a33ef3f..52b50e3ea4b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -21,12 +21,15 @@ package org.apache.pulsar.metadata.bookkeeper;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.Version;
@@ -46,6 +49,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService executor;
public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
@@ -60,11 +64,15 @@ public class PulsarRegistrationClient implements RegistrationClient {
this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
+ this.executor = Executors
+ .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
+
store.registerListener(this::updatedBookies);
}
@Override
public void close() {
+ executor.shutdownNow();
}
@Override
@@ -99,7 +107,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
return getWritableBookies()
- .thenAccept(registrationListener::onBookiesChanged);
+ .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}
@Override
@@ -111,7 +119,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
return getReadOnlyBookies()
- .thenAccept(registrationListener::onBookiesChanged);
+ .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}
@Override
@@ -124,11 +132,11 @@ public class PulsarRegistrationClient implements RegistrationClient {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.keySet()
- .forEach(w -> w.onBookiesChanged(bookies)));
+ .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.keySet()
- .forEach(w -> w.onBookiesChanged(bookies)));
+ .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
}
}
}