You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/07/25 09:15:38 UTC

[pulsar] branch master updated: [improve][broker] Make splitNamespaceBundle and getTopicHashPositions async (#16411)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f47c705e3cc [improve][broker] Make splitNamespaceBundle and getTopicHashPositions async  (#16411)
f47c705e3cc is described below

commit f47c705e3cc1cb0c34e48ad1ca71060f1202c70a
Author: gaozhangmin <zh...@apache.org>
AuthorDate: Mon Jul 25 17:15:25 2022 +0800

    [improve][broker] Make splitNamespaceBundle and getTopicHashPositions async  (#16411)
    
    * make splitNamespaceBundle and getTopicHashPositions async
    
    * appli comments
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <no...@gmail.com>
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <no...@gmail.com>
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <no...@gmail.com>
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
    
    Co-authored-by: Zixuan Liu <no...@gmail.com>
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <no...@gmail.com>
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
    
    Co-authored-by: Zixuan Liu <no...@gmail.com>
    
    * fix error
    
    * fix error
    
    * fix check style error
    
    Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
    Co-authored-by: Zixuan Liu <no...@gmail.com>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 189 ++++++++++-----------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 107 +++++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  65 ++++---
 .../apache/pulsar/broker/admin/NamespacesTest.java |   9 +-
 4 files changed, 214 insertions(+), 156 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f5da9ff6aa3..5b069edf75b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1115,116 +1115,99 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative,
-                                                boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
-        validateSuperUserAccess();
-        checkNotNull(bundleName, "BundleRange should not be null");
-        log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
-
-        String bundleRange = getBundleRange(bundleName);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        if (namespaceName.isGlobal()) {
-            // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
-            validateGlobalNamespaceOwnership(namespaceName);
-        } else {
-            validateClusterOwnership(namespaceName.getCluster());
-            validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
-        }
-
-        validatePoliciesReadOnlyAccess();
-
-        List<String> supportedNamespaceBundleSplitAlgorithms =
-                pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
-        if (StringUtils.isNotBlank(splitAlgorithmName)) {
-            if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
-                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                        "Unsupported namespace bundle split algorithm, supported algorithms are "
-                                + supportedNamespaceBundleSplitAlgorithms));
-            }
-            if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
-                    && (splitBoundaries == null || splitBoundaries.size() == 0)) {
-                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                        "With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
-            }
-        }
-
-        NamespaceBundle nsBundle;
-        try {
-            nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                    authoritative, false);
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-            return;
-        }
+    protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName,
+                                                                        boolean authoritative, boolean unload,
+                                                                        String splitAlgorithmName,
+                                                                        List<Long> splitBoundaries) {
+        return validateSuperUserAccessAsync()
+                .thenAccept(__ -> {
+                    checkNotNull(bundleName, "BundleRange should not be null");
+                    log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
+                    List<String> supportedNamespaceBundleSplitAlgorithms =
+                            pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
+                    if (StringUtils.isNotBlank(splitAlgorithmName)) {
+                        if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
+                            throw new RestException(Status.PRECONDITION_FAILED,
+                                    "Unsupported namespace bundle split algorithm, supported algorithms are "
+                                            + supportedNamespaceBundleSplitAlgorithms);
+                        }
+                        if (splitAlgorithmName
+                                .equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
+                                && (splitBoundaries == null || splitBoundaries.size() == 0)) {
+                            throw new RestException(Status.PRECONDITION_FAILED,
+                                    "With specified_positions_divide split algorithm, splitBoundaries must not be "
+                                            + "emtpy");
+                        }
+                    }
+                })
+                .thenCompose(__ -> {
+                    if (namespaceName.isGlobal()) {
+                        // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+                        return validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return validateClusterOwnershipAsync(namespaceName.getCluster())
+                                .thenCompose(ignore -> validateClusterForTenantAsync(namespaceName.getTenant(),
+                                        namespaceName.getCluster()));
+                    }
+                })
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies->{
+                    String bundleRange = getBundleRange(bundleName);
+                    return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
+                            authoritative, false)
+                            .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+                                    getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
 
-        pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
-                getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
-                .thenRun(() -> {
-                    log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
-            if (ex.getCause() instanceof IllegalArgumentException) {
-                log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
-                        bundleRange, ex.getMessage());
-                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                        "Split bundle failed due to invalid request"));
-            } else {
-                log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex);
-                asyncResponse.resume(new RestException(ex.getCause()));
-            }
-            return null;
-        });
+                });
     }
 
-    protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
+    protected CompletableFuture<TopicHashPositions> internalGetTopicHashPositionsAsync(String bundleRange,
+                                                                                       List<String> topics) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange);
         }
-        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                false, true);
-        pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
-                (allTopicsInThisBundle, throwable) -> {
-                    if (throwable != null) {
-                        log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
-                                namespaceName, bundle);
-                        asyncResponse.resume(new RestException(throwable));
-                    }
-                    // if topics is empty, return all topics' hash position in this bundle
-                    Map<String, Long> topicHashPositions = new HashMap<>();
-                    if (topics == null || topics.size() == 0) {
-                        allTopicsInThisBundle.forEach(t -> {
-                            topicHashPositions.put(t,
-                                    pulsar().getNamespaceService().getNamespaceBundleFactory()
-                                            .getLongHashCode(t));
-                        });
-                    } else {
-                        for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) {
-                            TopicName topicName = TopicName.get(topic);
-                            // partitioned topic
-                            if (topicName.getPartitionIndex() == -1) {
-                                allTopicsInThisBundle.stream()
-                                        .filter(t -> TopicName.get(t).getPartitionedTopicName()
-                                                .equals(TopicName.get(topic).getPartitionedTopicName()))
-                                        .forEach(partition -> {
-                                            topicHashPositions.put(partition,
-                                                    pulsar().getNamespaceService().getNamespaceBundleFactory()
-                                                            .getLongHashCode(partition));
-                                        });
-                            } else { // topic partition
-                                if (allTopicsInThisBundle.contains(topicName.toString())) {
-                                    topicHashPositions.put(topic,
-                                            pulsar().getNamespaceService().getNamespaceBundleFactory()
-                                                    .getLongHashCode(topic));
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies -> {
+                    return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
+                            false, true)
+                            .thenCompose(nsBundle ->
+                                    pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
+                            .thenApply(allTopicsInThisBundle -> {
+                                Map<String, Long> topicHashPositions = new HashMap<>();
+                                if (topics == null || topics.size() == 0) {
+                                    allTopicsInThisBundle.forEach(t -> {
+                                        topicHashPositions.put(t,
+                                                pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                                        .getLongHashCode(t));
+                                    });
+                                } else {
+                                    for (String topic : topics.stream().map(Codec::decode).toList()) {
+                                        TopicName topicName = TopicName.get(topic);
+                                        // partitioned topic
+                                        if (topicName.getPartitionIndex() == -1) {
+                                            allTopicsInThisBundle.stream()
+                                                    .filter(t -> TopicName.get(t).getPartitionedTopicName()
+                                                            .equals(TopicName.get(topic).getPartitionedTopicName()))
+                                                    .forEach(partition -> {
+                                                        topicHashPositions.put(partition,
+                                                                pulsar().getNamespaceService()
+                                                                        .getNamespaceBundleFactory()
+                                                                        .getLongHashCode(partition));
+                                                    });
+                                        } else { // topic partition
+                                            if (allTopicsInThisBundle.contains(topicName.toString())) {
+                                                topicHashPositions.put(topic,
+                                                        pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                                                .getLongHashCode(topic));
+                                            }
+                                        }
+                                    }
                                 }
-                            }
-                        }
-                    }
-                    asyncResponse.resume(
-                            new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions));
+                                return new TopicHashPositions(namespaceName.toString(), bundleRange,
+                                        topicHashPositions);
+                            });
                 });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4264e5669d0..d3f1fb9a9e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -809,15 +809,22 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
-    public BundlesData getBundlesData(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
-        validatePoliciesReadOnlyAccess();
-        validateNamespaceName(property, cluster, namespace);
-        validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_BUNDLE);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        return policies.bundles;
+    public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+                              @PathParam("property") String property,
+                              @PathParam("cluster") String cluster,
+                              @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        validatePoliciesReadOnlyAccessAsync()
+                .thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(property, namespace),
+                        NamespaceOperation.GET_BUNDLE))
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get bundle data for namespace {} ", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -899,15 +906,27 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
             @QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange,
-                    authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalSplitNamespaceBundleAsync(bundleRange,
+                authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to split namespace bundle {}/{}",
+                                clientAppId(), namespaceName, bundleRange, ex);
+                    }
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof IllegalArgumentException) {
+                        asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED,
+                                "Split bundle failed due to invalid request"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
@@ -923,17 +942,32 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("topics") List<String> topics,
             @Suspended AsyncResponse asyncResponse) {
         validateNamespaceName(property, cluster, namespace);
-        internalGetTopicHashPositions(asyncResponse, bundle, topics);
+        internalGetTopicHashPositionsAsync(bundle, topics)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
+                                namespaceName, bundle);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{property}/{cluster}/{namespace}/publishRate")
     @ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, PublishRate publishRate) {
+    public void setPublishRate(@Suspended AsyncResponse asyncResponse,
+                               @PathParam("property") String property, @PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace, PublishRate publishRate) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetPublishRate(publishRate);
+        internalSetPublishRateAsync(publishRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -943,20 +977,37 @@ public class Namespaces extends NamespacesBase {
                     + "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public PublishRate getPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
+    public void getPublishRate(@Suspended AsyncResponse asyncResponse,
+                               @PathParam("property") String property,
+                               @PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetPublishRate();
+        internalGetPublishRateAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get publish rate for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{property}/{cluster}/{namespace}/dispatchRate")
     @ApiOperation(hidden = true, value = "Set dispatch-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, DispatchRateImpl dispatchRate) {
+    public void setDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                @PathParam("property") String property,
+                                @PathParam("cluster") String cluster,
+                                @PathParam("namespace") String namespace, DispatchRateImpl dispatchRate) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetTopicDispatchRate(dispatchRate);
+        internalSetTopicDispatchRateAsync(dispatchRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the dispatchRate for cluster on namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index aa3698e1c7d..6d8fe5bc1d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
-import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -735,15 +734,21 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
-    public BundlesData getBundlesData(@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace) {
-        validatePoliciesReadOnlyAccess();
+    public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+                                      @PathParam("tenant") String tenant,
+                                      @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_BUNDLE);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        return policies.bundles;
+        validatePoliciesReadOnlyAccessAsync()
+                .thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
+                        NamespaceOperation.GET_BUNDLE))
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get bundle data for namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -826,15 +831,26 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("unload") @DefaultValue("false") boolean unload,
             @QueryParam("splitAlgorithmName") String splitAlgorithmName,
             @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse,
-                    bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalSplitNamespaceBundleAsync(bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to split namespace bundle {}/{} due to {}",
+                                clientAppId(), namespaceName, bundleRange, ex.getMessage());
+                    }
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof IllegalArgumentException) {
+                        asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED,
+                                "Split bundle failed due to invalid request"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
@@ -849,8 +865,17 @@ public class Namespaces extends NamespacesBase {
             @PathParam("bundle") String bundleRange,
             @QueryParam("topics") List<String> topics,
             @Suspended AsyncResponse asyncResponse) {
-            validateNamespaceName(tenant, namespace);
-            internalGetTopicHashPositions(asyncResponse, bundleRange, topics);
+        validateNamespaceName(tenant, namespace);
+        internalGetTopicHashPositionsAsync(bundleRange, topics)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
+                                namespaceName, bundleRange);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 11a16d4d17f..2369a0af4bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -644,10 +644,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 .numBundles(boundaries.size() - 1)
                 .build();
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, "test-bundled-namespace-1", bundle);
-        BundlesData responseData = namespaces.getBundlesData(testTenant, this.testLocalCluster,
-                "test-bundled-namespace-1");
-
-        assertEquals(responseData, bundle);
+        assertEquals(asyncRequests(ctx -> namespaces.getBundlesData(ctx, testTenant, this.testLocalCluster,
+                "test-bundled-namespace-1")), bundle);
     }
 
     @Test
@@ -917,7 +915,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
             verify(response, timeout(5000).times(1)).resume(captor.capture());
             // verify split bundles
-            BundlesData bundlesData = namespaces.getBundlesData(testTenant, testLocalCluster, bundledNsLocal);
+            BundlesData bundlesData = (BundlesData) asyncRequests(ctx -> namespaces.getBundlesData(ctx, testTenant,
+                    testLocalCluster, bundledNsLocal));
             assertNotNull(bundlesData);
             assertEquals(bundlesData.getBoundaries().size(), 3);
             assertEquals(bundlesData.getBoundaries().get(0), "0x00000000");