You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/30 12:08:33 UTC

[pulsar] branch branch-3.0 updated: [fix][broker]Fix deadlock of metadata store (#20189)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new dd40137ee40 [fix][broker]Fix deadlock of metadata store (#20189)
dd40137ee40 is described below

commit dd40137ee401286df97607de528007eef8e82cf0
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu May 18 11:15:53 2023 +0800

    [fix][broker]Fix deadlock of metadata store (#20189)
    
    Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.
    
    Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
    (cherry picked from commit 4678c36d4023a2bb8361e0a70673b96de33f06ac)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 32 ++++++++++++----------
 .../pulsar/broker/namespace/OwnershipCache.java    |  6 ++--
 2 files changed, 21 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a0e2bf7534c..9d8d9e3890a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -38,7 +38,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -1137,16 +1139,17 @@ public class NamespaceService implements AutoCloseable {
                 new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
     }
 
+    /**
+     * @Deprecated This method is only used in test now.
+     */
+    @Deprecated
     public boolean isServiceUnitActive(TopicName topicName) {
         try {
-            OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName));
-            if (ownedBundle == null) {
-                return false;
-            }
-            return ownedBundle.isActive();
-        } catch (Exception e) {
-            LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName, e);
-            return false;
+            return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
+                    .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -1156,12 +1159,13 @@ public class NamespaceService implements AutoCloseable {
             return getBundleAsync(topicName)
                     .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
         }
-        Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName));
-        if (!res.isPresent()) {
-            return CompletableFuture.completedFuture(false);
-        }
-
-        return res.get().thenApply(ob -> ob != null && ob.isActive());
+        return getBundleAsync(topicName).thenCompose(bundle -> {
+            Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
+            if (!optionalFuture.isPresent()) {
+                return CompletableFuture.completedFuture(false);
+            }
+            return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
+        });
     }
 
     private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 7d0b5a41477..86003153714 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -288,10 +288,10 @@ public class OwnershipCache {
 
     /**
      * Disable bundle in local cache and on zk.
-     *
-     * @param bundle
-     * @throws Exception
+     * @Deprecated This is a dangerous method  which is currently only used for test, it will occupy the ZK thread.
+     * Please switch to your own thread after calling this method.
      */
+    @Deprecated
     public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
         return updateBundleState(bundle, false)
                 .thenCompose(__ -> {