You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/30 12:08:33 UTC
[pulsar] branch branch-3.0 updated: [fix][broker]Fix deadlock of metadata store (#20189)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new dd40137ee40 [fix][broker]Fix deadlock of metadata store (#20189)
dd40137ee40 is described below
commit dd40137ee401286df97607de528007eef8e82cf0
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu May 18 11:15:53 2023 +0800
[fix][broker]Fix deadlock of metadata store (#20189)
Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.
Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
(cherry picked from commit 4678c36d4023a2bb8361e0a70673b96de33f06ac)
---
.../pulsar/broker/namespace/NamespaceService.java | 32 ++++++++++++----------
.../pulsar/broker/namespace/OwnershipCache.java | 6 ++--
2 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a0e2bf7534c..9d8d9e3890a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -38,7 +38,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -1137,16 +1139,17 @@ public class NamespaceService implements AutoCloseable {
new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
}
+ /**
+ * @Deprecated This method is only used in test now.
+ */
+ @Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
- OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName));
- if (ownedBundle == null) {
- return false;
- }
- return ownedBundle.isActive();
- } catch (Exception e) {
- LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName, e);
- return false;
+ return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
+ .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
+ throw new RuntimeException(e);
}
}
@@ -1156,12 +1159,13 @@ public class NamespaceService implements AutoCloseable {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
- Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName));
- if (!res.isPresent()) {
- return CompletableFuture.completedFuture(false);
- }
-
- return res.get().thenApply(ob -> ob != null && ob.isActive());
+ return getBundleAsync(topicName).thenCompose(bundle -> {
+ Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
+ if (!optionalFuture.isPresent()) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
+ });
}
private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 7d0b5a41477..86003153714 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -288,10 +288,10 @@ public class OwnershipCache {
/**
* Disable bundle in local cache and on zk.
- *
- * @param bundle
- * @throws Exception
+ * @Deprecated This is a dangerous method which is currently only used for test, it will occupy the ZK thread.
+ * Please switch to your own thread after calling this method.
*/
+ @Deprecated
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
return updateBundleState(bundle, false)
.thenCompose(__ -> {