You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/24 01:58:19 UTC
[pulsar] 02/03: [fix][broker]Fix deadlock of metadata store (#20189)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c71f4c431b3d07baed9d1f77acf573e66dcb48c4
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu May 18 11:15:53 2023 +0800
[fix][broker]Fix deadlock of metadata store (#20189)
Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.
Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
---
.../pulsar/broker/namespace/NamespaceService.java | 32 ++++++++++++----------
.../pulsar/broker/namespace/OwnershipCache.java | 6 ++--
2 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 700819e3f2a..4b2df9c5487 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -38,7 +38,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -1009,26 +1011,28 @@ public class NamespaceService implements AutoCloseable {
new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
}
+ /**
+ * @Deprecated This method is only used in test now.
+ */
+ @Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
- OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName));
- if (ownedBundle == null) {
- return false;
- }
- return ownedBundle.isActive();
- } catch (Exception e) {
- LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName, e);
- return false;
+ return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
+ .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
+ throw new RuntimeException(e);
}
}
public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
- Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName));
- if (!res.isPresent()) {
- return CompletableFuture.completedFuture(false);
- }
-
- return res.get().thenApply(ob -> ob != null && ob.isActive());
+ return getBundleAsync(topicName).thenCompose(bundle -> {
+ Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
+ if (!optionalFuture.isPresent()) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
+ });
}
private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index a6ba5fb5d90..05e89e78803 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -295,10 +295,10 @@ public class OwnershipCache {
/**
* Disable bundle in local cache and on zk.
- *
- * @param bundle
- * @throws Exception
+ * @Deprecated This is a dangerous method which is currently only used for test, it will occupy the ZK thread.
+ * Please switch to your own thread after calling this method.
*/
+ @Deprecated
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
return updateBundleState(bundle, false)
.thenCompose(__ -> {