You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:01:27 UTC

[pulsar] branch branch-2.7 updated: Issue #8533: Change method `getWebServiceUrl` into async (#8746)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0f18723  Issue #8533: Change method `getWebServiceUrl` into async (#8746)
0f18723 is described below

commit 0f18723f42b9b5c1b68752d708cc2b5efa9025a6
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Dec 11 12:59:09 2020 +0800

    Issue #8533: Change method `getWebServiceUrl` into async (#8746)
    
    fix #8533 , modified only issue-related method, may need to change all calls in this section to asynchronous later.
    
    (cherry picked from commit 6a7ec2eec51c317af101e3da207add8e47b19151)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 101 +++++++++++----------
 .../broker/admin/v1/NonPersistentTopics.java       |  80 +++++++++-------
 .../broker/admin/v2/NonPersistentTopics.java       |  54 ++++++-----
 .../pulsar/broker/namespace/NamespaceService.java  |  30 ++++--
 .../pulsar/broker/web/PulsarWebResource.java       |   4 +-
 .../common/naming/NamespaceBundleFactory.java      |   4 +
 .../apache/pulsar/broker/admin/NamespacesTest.java |   6 +-
 7 files changed, 162 insertions(+), 117 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index df18b53..208902e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.admin.impl;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -637,7 +638,7 @@ public abstract class NamespacesBase extends AdminResource {
             AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
             if (null != authService) {
                 authService.grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/)
-                    .get();
+                        .get();
             } else {
                 throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
             }
@@ -1308,26 +1309,34 @@ public abstract class NamespacesBase extends AdminResource {
 
         validatePoliciesReadOnlyAccess();
 
-        if (!isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange)) {
-            log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
-                    bundleRange);
-            asyncResponse.resume(Response.noContent().build());
-            return;
-        }
-
-        NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                authoritative, true);
-
-        pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
-                .thenRun(() -> {
-                    log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString());
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange,
-                            ex);
-                    asyncResponse.resume(new RestException(ex));
-                    return null;
-                });
+        isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> {
+            if (!flag) {
+                log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
+                        bundleRange);
+                asyncResponse.resume(Response.noContent().build());
+                return;
+            }
+            NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                    authoritative, true);
+
+            pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
+                    .thenRun(() -> {
+                        log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString());
+                        asyncResponse.resume(Response.noContent().build());
+                    }).exceptionally(ex -> {
+                log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange,
+                        ex);
+                asyncResponse.resume(new RestException(ex));
+                return null;
+            });
+        }).exceptionally((ex) -> {
+            if (ex.getCause() instanceof WebApplicationException) {
+                asyncResponse.resume(ex.getCause());
+            } else {
+                asyncResponse.resume(new RestException(ex.getCause()));
+            }
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
@@ -1362,7 +1371,7 @@ public abstract class NamespacesBase extends AdminResource {
         } catch (ExecutionException e) {
             if (e.getCause() instanceof IllegalArgumentException) {
                 log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
-                    bundleRange, e.getMessage());
+                        bundleRange, e.getMessage());
                 throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request");
             } else {
                 log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
@@ -1548,7 +1557,7 @@ public abstract class NamespacesBase extends AdminResource {
                 namespaceName);
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}: does not exist",
-                clientAppId(), namespaceName);
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
@@ -1558,7 +1567,7 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
             log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
-                namespaceName, e);
+                    namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -1648,7 +1657,7 @@ public abstract class NamespacesBase extends AdminResource {
                 namespaceName);
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
-                clientAppId(), namespaceName);
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
@@ -1658,7 +1667,7 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
             log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
-                namespaceName, e);
+                    namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -1902,7 +1911,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
-            boolean authoritative) {
+                                                                boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
         checkNotNull(subscription, "Subscription should not be null");
 
@@ -1946,7 +1955,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
-            boolean authoritative) {
+                                                                      boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
         checkNotNull(subscription, "Subscription should not be null");
         checkNotNull(bundleRange, "BundleRange should not be null");
@@ -1969,7 +1978,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
-            boolean authoritative) {
+                                                boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
         checkNotNull(subscription, "Subscription should not be null");
 
@@ -2131,13 +2140,13 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies){
+    protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
         validateSuperUserAccess();
         validatePoliciesReadOnlyAccess();
         internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
     }
 
-    protected void internalSetPolicies(String fieldName, Object value){
+    protected void internalSetPolicies(String fieldName, Object value) {
         try {
             Stat nodeStat = new Stat();
             final String path = path(POLICIES, namespaceName.toString());
@@ -2270,7 +2279,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
-            String tenant) {
+                                                             String tenant) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
         checkNotNull(cluster, "Cluster should not be null");
         checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
@@ -2758,11 +2767,11 @@ public abstract class NamespacesBase extends AdminResource {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated compactionThreshold configuration: namespace={}, value={}",
-                     clientAppId(), namespaceName, policies.compaction_threshold);
+                    clientAppId(), namespaceName, policies.compaction_threshold);
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: does not exist",
-                     clientAppId(), namespaceName);
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: concurrent modification",
@@ -2772,7 +2781,7 @@ public abstract class NamespacesBase extends AdminResource {
             throw pfe;
         } catch (Exception e) {
             log.error("[{}] Failed to update compactionThreshold configuration for namespace {}",
-                      clientAppId(), namespaceName, e);
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2806,21 +2815,21 @@ public abstract class NamespacesBase extends AdminResource {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}",
-                     clientAppId(), namespaceName, policies.offload_threshold);
+                    clientAppId(), namespaceName, policies.offload_threshold);
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update offloadThreshold configuration for namespace {}: does not exist",
-                     clientAppId(), namespaceName);
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn("[{}] Failed to update offloadThreshold configuration for namespace {}: concurrent modification",
-                     clientAppId(), namespaceName);
+                    clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
             log.error("[{}] Failed to update offloadThreshold configuration for namespace {}",
-                      clientAppId(), namespaceName, e);
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2854,11 +2863,11 @@ public abstract class NamespacesBase extends AdminResource {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}",
-                     clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
+                    clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace {}: does not exist",
-                     clientAppId(), namespaceName);
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace {}: concurrent modification",
@@ -2868,7 +2877,7 @@ public abstract class NamespacesBase extends AdminResource {
             throw pfe;
         } catch (Exception e) {
             log.error("[{}] Failed to update offloadDeletionLag configuration for namespace {}",
-                      clientAppId(), namespaceName, e);
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2958,21 +2967,21 @@ public abstract class NamespacesBase extends AdminResource {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated {} configuration: namespace={}, value={}",
-                     clientAppId(), policyName, namespaceName, getter.apply(policies));
+                    clientAppId(), policyName, namespaceName, getter.apply(policies));
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist",
-                     clientAppId(), policyName, namespaceName);
+                    clientAppId(), policyName, namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
-                     clientAppId(), policyName, namespaceName);
+                    clientAppId(), policyName, namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
             log.error("[{}] Failed to update {} configuration for namespace {}",
-                      clientAppId(), policyName, namespaceName, e);
+                    clientAppId(), policyName, namespaceName, e);
             throw new RestException(e);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 179920b..bc3ddd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.admin.v1;
 
 import com.google.common.collect.Lists;
@@ -28,7 +29,7 @@ import io.swagger.annotations.ApiResponses;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
@@ -73,12 +74,15 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiOperation(hidden = true, value = "Get partitioned topic metadata.")
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
-            @ApiResponse(code = 403, message = "Don't have admin permission") })
+            @ApiResponse(code = 403, message = "Don't have admin permission")})
     public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
+                                                           @PathParam("cluster") String cluster,
+                                                           @PathParam("namespace") String namespace,
+                                                           @PathParam("topic") @Encoded String encodedTopic,
+                                                           @QueryParam("authoritative") @DefaultValue("false")
+                                                                       boolean authoritative,
+                                                           @QueryParam("checkAllowAutoCreation") @DefaultValue("false")
+                                                                       boolean checkAllowAutoCreation) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
     }
@@ -89,11 +93,12 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Topic does not exist") })
+            @ApiResponse(code = 404, message = "Topic does not exist")})
     public NonPersistentTopicStats getStats(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+                                            @PathParam("cluster") String cluster,
+                                            @PathParam("namespace") String namespace,
+                                            @PathParam("topic") @Encoded String encodedTopic,
+                                            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
@@ -106,12 +111,15 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Topic does not exist") })
+            @ApiResponse(code = 404, message = "Topic does not exist")})
     public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
+                                                         @PathParam("cluster") String cluster, @PathParam("namespace")
+                                                                     String namespace,
+                                                         @PathParam("topic") @Encoded String encodedTopic,
+                                                         @QueryParam("authoritative") @DefaultValue("false")
+                                                                     boolean authoritative,
+                                                         @QueryParam("metadata") @DefaultValue("false")
+                                                                     boolean metadata) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
@@ -125,13 +133,17 @@ public class NonPersistentTopics extends PersistentTopics {
 
     @PUT
     @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
-    @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
-            @ApiResponse(code = 409, message = "Partitioned topic already exist") })
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            int numPartitions) {
+    @ApiOperation(hidden = true, value = "Create a partitioned topic.",
+            notes = "It needs to be called before creating a producer on a partitioned topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
+                    + " to maxNumPartitionsPerPartitionedTopic"),
+            @ApiResponse(code = 409, message = "Partitioned topic already exist")})
+    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
+                                       @PathParam("property") String property, @PathParam("cluster") String cluster,
+                                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
+                                               String encodedTopic,
+                                       int numPartitions) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
             internalCreatePartitionedTopic(asyncResponse, numPartitions);
@@ -147,11 +159,11 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Topic does not exist") })
+            @ApiResponse(code = 404, message = "Topic does not exist")})
     public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+                            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                            @PathParam("topic") @Encoded String encodedTopic,
+                            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
             internalUnloadTopic(asyncResponse, authoritative);
@@ -170,7 +182,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
     public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+                        @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
         log.info("[{}] list of topics on namespace {}/{}/{}", clientAppId(), property, cluster, namespace);
 
         Policies policies = null;
@@ -250,13 +262,15 @@ public class NonPersistentTopics extends PersistentTopics {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
         }
         NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
-        if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)) {
-            log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property, cluster,
-                    namespace, bundleRange);
-            return null;
-        }
-        NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
         try {
+            if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)
+                    .get(pulsar().getConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
+                log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property,
+                        cluster, namespace, bundleRange);
+                return null;
+            }
+            NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange,
+                    true, true);
             final List<String> topicList = Lists.newArrayList();
             pulsar().getBrokerService().forEachTopic(topic -> {
                 TopicName topicName = TopicName.get(topic.getName());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 6254311..59b9dad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.admin.v2;
 
 import com.google.common.collect.Lists;
@@ -43,6 +44,7 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.pulsar.broker.PulsarServerException;
@@ -287,9 +289,9 @@ public class NonPersistentTopics extends PersistentTopics {
             }
 
             final List<String> nonPersistentTopics =
-                topics.stream()
-                      .filter(name -> !TopicName.get(name).isPersistent())
-                      .collect(Collectors.toList());
+                    topics.stream()
+                            .filter(name -> !TopicName.get(name).isPersistent())
+                            .collect(Collectors.toList());
             asyncResponse.resume(nonPersistentTopics);
             return null;
         });
@@ -306,7 +308,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
     })
-    public List<String> getListFromBundle(
+    public void getListFromBundle(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -324,29 +327,32 @@ public class NonPersistentTopics extends PersistentTopics {
         // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
         validateGlobalNamespaceOwnership(namespaceName);
 
-        if (!isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange)) {
-            log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
-                    bundleRange);
-            return null;
-        }
-
-        NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true);
-        try {
-            final List<String> topicList = Lists.newArrayList();
-            pulsar().getBrokerService().forEachTopic(topic -> {
-                TopicName topicName = TopicName.get(topic.getName());
-                if (nsBundle.includes(topicName)) {
-                    topicList.add(topic.getName());
+        isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> {
+            if (!flag) {
+                log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
+                        bundleRange);
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
+                        bundleRange, true, true);
+                try {
+                    final List<String> topicList = Lists.newArrayList();
+                    pulsar().getBrokerService().forEachTopic(topic -> {
+                        TopicName topicName = TopicName.get(topic.getName());
+                        if (nsBundle.includes(topicName)) {
+                            topicList.add(topic.getName());
+                        }
+                    });
+                    asyncResponse.resume(topicList);
+                } catch (Exception e) {
+                    log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(),
+                            namespaceName, bundleRange, e);
+                    asyncResponse.resume(new RestException(e));
                 }
-            });
-            return topicList;
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
-            throw new RestException(e);
-        }
+            }
+        });
     }
 
-
     protected void validateAdminOperationOnTopic(TopicName topicName, boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateTopicOwnership(topicName, authoritative);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a73085f..99b661e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -234,34 +234,46 @@ public class NamespaceService {
         return bundleFactory.getFullBundle(fqnn);
     }
 
+    private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) throws Exception {
+        return bundleFactory.getFullBundleAsync(fqnn);
+    }
+
     /**
-     * Return the URL of the broker who's owning a particular service unit.
+     * Return the URL of the broker who's owning a particular service unit in asynchronous way
      *
-     * If the service unit is not owned, return an empty optional
+     * If the service unit is not owned, return a CompletableFuture with empty optional
      */
-    public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
+    public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) throws Exception {
         if (suName instanceof TopicName) {
             TopicName name = (TopicName) suName;
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Getting web service URL of topic: {} - options: {}", name, options);
             }
-            return this.internalGetWebServiceUrl(getBundle(name), options)
-                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+            return getBundleAsync(name)
+                    .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options));
         }
 
         if (suName instanceof NamespaceName) {
-            return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), options)
-                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+            return getFullBundleAsync((NamespaceName) suName)
+                    .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options));
         }
 
         if (suName instanceof NamespaceBundle) {
-            return this.internalGetWebServiceUrl((NamespaceBundle) suName, options)
-                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+            return internalGetWebServiceUrl((NamespaceBundle) suName, options);
         }
 
         throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
     }
 
+    /**
+     * Return the URL of the broker who's owning a particular service unit.
+     *
+     * If the service unit is not owned, return an empty optional
+     */
+    public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
+        return getWebServiceUrlAsync(suName, options).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+    }
+
     private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
 
         return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 7478c9d..677d81e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -501,7 +501,7 @@ public abstract class PulsarWebResource {
     /**
      * Checks whether a given bundle is currently loaded by any broker
      */
-    protected boolean isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
+    protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
             String bundleRange) {
         NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
         NamespaceService nsService = pulsar().getNamespaceService();
@@ -512,7 +512,7 @@ public abstract class PulsarWebResource {
                 .readOnly(true)
                 .loadTopicsInBundle(false).build();
         try {
-            return nsService.getWebServiceUrl(nsBundle, options).isPresent();
+            return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent());
         } catch (Exception e) {
             log.error("[{}] Failed to check whether namespace bundle is owned {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
             throw new RestException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 1de6212..0e0b031 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -181,6 +181,10 @@ public class NamespaceBundleFactory implements ZooKeeperCacheListener<LocalPolic
         return bundlesCache.synchronous().get(fqnn).getFullBundle();
     }
 
+    public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) throws Exception {
+        return bundlesCache.get(fqnn).thenApply(NamespaceBundles::getFullBundle);
+    }
+
     public long getLongHashCode(String name) {
         return this.hashFunc.hashString(name, Charsets.UTF_8).padToLong();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b4a0ae8..d51911a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -909,8 +909,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
         final NamespaceName testNs = NamespaceName.get(this.testTenant, this.testLocalCluster, bundledNsLocal);
 
-        doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
-                .getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
+        doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc)
+                .getWebServiceUrlAsync(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
                         Mockito.any());
         doReturn(true).when(nsSvc)
                 .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)));
@@ -919,7 +919,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         NamespaceBundle testBundle = nsBundles.getBundles().get(0);
         // make one bundle owned
         LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
-        doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testBundle, optionsHttps);
+        doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc).getWebServiceUrlAsync(testBundle, optionsHttps);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testBundle);
         doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
         AsyncResponse response = mock(AsyncResponse.class);