You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/06 07:41:27 UTC
[pulsar] branch master updated: [improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c59402ef09c [improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023)
c59402ef09c is described below
commit c59402ef09c870469a4d5ff835fa6222518704b9
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 6 15:41:17 2022 +0800
[improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023)
---
.../apache/pulsar/broker/namespace/NamespaceService.java | 2 +-
.../org/apache/pulsar/broker/namespace/OwnershipCache.java | 13 +++++++++++--
.../apache/pulsar/broker/namespace/OwnershipCacheTest.java | 2 +-
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 91383b87502..a321c035c78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1029,7 +1029,7 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
return getBundleAsync(topicName)
- .thenApply(ownershipCache::checkOwnership);
+ .thenCompose(ownershipCache::checkOwnershipAsync);
}
public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 382a3ccdf4a..3d8bf2654c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -148,8 +148,13 @@ public class OwnershipCache {
* @param bundle namespace bundle
* @return future that will complete with check result
*/
- public boolean checkOwnership(NamespaceBundle bundle) {
- return getOwnedBundle(bundle) != null;
+ public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle bundle) {
+ Optional<CompletableFuture<OwnedBundle>> ownedBundleFuture = getOwnedBundleAsync(bundle);
+ if (!ownedBundleFuture.isPresent()) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return ownedBundleFuture.get()
+ .thenApply(bd -> bd != null && bd.isActive());
}
/**
@@ -279,6 +284,10 @@ public class OwnershipCache {
}
}
+ public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBundle bundle) {
+ return Optional.ofNullable(ownedBundlesCache.getIfPresent(bundle));
+ }
+
/**
* Disable bundle in local cache and on zk.
*
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 143d5ef78f5..dde25fa2eed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -401,7 +401,7 @@ public class OwnershipCacheTest {
assertFalse(data3.isDisabled());
assertNotNull(cache.getOwnedBundle(testFullBundle));
- assertTrue(cache.checkOwnership(testFullBundle));
+ assertTrue(cache.checkOwnershipAsync(testFullBundle).get());
assertEquals(data2.getNativeUrl(), selfBrokerUrl);
assertFalse(data2.isDisabled());
assertNotNull(cache.getOwnedBundle(testFullBundle));