You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/06 07:41:27 UTC

[pulsar] branch master updated: [improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c59402ef09c [improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023)
c59402ef09c is described below

commit c59402ef09c870469a4d5ff835fa6222518704b9
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 6 15:41:17 2022 +0800

    [improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023)
---
 .../apache/pulsar/broker/namespace/NamespaceService.java    |  2 +-
 .../org/apache/pulsar/broker/namespace/OwnershipCache.java  | 13 +++++++++++--
 .../apache/pulsar/broker/namespace/OwnershipCacheTest.java  |  2 +-
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 91383b87502..a321c035c78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1029,7 +1029,7 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
         return getBundleAsync(topicName)
-                .thenApply(ownershipCache::checkOwnership);
+                .thenCompose(ownershipCache::checkOwnershipAsync);
     }
 
     public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 382a3ccdf4a..3d8bf2654c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -148,8 +148,13 @@ public class OwnershipCache {
      * @param bundle namespace bundle
      * @return future that will complete with check result
      */
-    public boolean checkOwnership(NamespaceBundle bundle) {
-        return getOwnedBundle(bundle) != null;
+    public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle bundle) {
+        Optional<CompletableFuture<OwnedBundle>> ownedBundleFuture = getOwnedBundleAsync(bundle);
+        if (!ownedBundleFuture.isPresent()) {
+            return CompletableFuture.completedFuture(false);
+        }
+        return ownedBundleFuture.get()
+                .thenApply(bd -> bd != null && bd.isActive());
     }
 
     /**
@@ -279,6 +284,10 @@ public class OwnershipCache {
         }
     }
 
+    public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBundle bundle) {
+        return Optional.ofNullable(ownedBundlesCache.getIfPresent(bundle));
+    }
+
     /**
      * Disable bundle in local cache and on zk.
      *
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 143d5ef78f5..dde25fa2eed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -401,7 +401,7 @@ public class OwnershipCacheTest {
         assertFalse(data3.isDisabled());
         assertNotNull(cache.getOwnedBundle(testFullBundle));
 
-        assertTrue(cache.checkOwnership(testFullBundle));
+        assertTrue(cache.checkOwnershipAsync(testFullBundle).get());
         assertEquals(data2.getNativeUrl(), selfBrokerUrl);
         assertFalse(data2.isDisabled());
         assertNotNull(cache.getOwnedBundle(testFullBundle));