You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/06 23:07:22 UTC

[pulsar] branch master updated: Fix some async method problems at PersistentTopicsBase. (#6483)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ca8e6  Fix some async method problems at PersistentTopicsBase. (#6483)
47ca8e6 is described below

commit 47ca8e64d8f36d89af587a7b9a8865622701e109
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Mar 7 07:07:12 2020 +0800

    Fix some async method problems at PersistentTopicsBase. (#6483)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |   20 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 1047 +++++++++++---------
 2 files changed, 610 insertions(+), 457 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index a216982..98d370f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -571,16 +571,24 @@ public abstract class AdminResource extends PulsarWebResource {
 
     protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
-        validateClusterOwnership(topicName.getCluster());
-        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
-        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
-        // producer/consumer
-        validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+        try {
+            validateClusterOwnership(topicName.getCluster());
+            // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+            // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+            // producer/consumer
+            validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
 
         try {
             checkConnect(topicName);
         } catch (WebApplicationException e) {
-            validateAdminAccessForTenant(topicName.getTenant());
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+            } catch (Exception ex) {
+                return FutureUtil.failedFuture(ex);
+            }
         } catch (Exception e) {
             // unknown error marked as internal server error
             log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f2a95d1..d74017c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -539,50 +539,58 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
-        validateAdminAccessForTenant(topicName.getTenant());
-
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
         final CompletableFuture<Void> future = new CompletableFuture<>();
-
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-        final int numPartitions = partitionMetadata.partitions;
-        if (numPartitions > 0) {
-            final AtomicInteger count = new AtomicInteger(numPartitions);
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
-                            .whenComplete((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof NotFoundException) {
-                                        // if the sub-topic is not found, the client might not have called create
-                                        // producer or it might have been deleted earlier, so we ignore the 404 error.
-                                        // For all other exception, we fail the delete partition method even if a single
-                                        // partition is failed to be deleted
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("[{}] Partition not found: {}", clientAppId(),
-                                                    topicNamePartition);
+        getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMeta -> {
+            final int numPartitions = partitionMeta.partitions;
+            if (numPartitions > 0) {
+                final AtomicInteger count = new AtomicInteger(numPartitions);
+                for (int i = 0; i < numPartitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
+                                .whenComplete((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof NotFoundException) {
+                                            // if the sub-topic is not found, the client might not have called create
+                                            // producer or it might have been deleted earlier, so we ignore the 404 error.
+                                            // For all other exception, we fail the delete partition method even if a single
+                                            // partition is failed to be deleted
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("[{}] Partition not found: {}", clientAppId(),
+                                                        topicNamePartition);
+                                            }
+                                        } else {
+                                            log.error("[{}] Failed to delete partition {}", clientAppId(),
+                                                    topicNamePartition, ex);
+                                            future.completeExceptionally(ex);
+                                            return;
                                         }
                                     } else {
-                                        log.error("[{}] Failed to delete partition {}", clientAppId(),
-                                                topicNamePartition, ex);
-                                        future.completeExceptionally(ex);
-                                        return;
+                                        log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
                                     }
-                                } else {
-                                    log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
-                                }
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
-                    future.completeExceptionally(e);
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
+                                });
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
+                        future.completeExceptionally(e);
+                    }
                 }
+            } else {
+                future.complete(null);
             }
-        } else {
-            future.complete(null);
-        }
+        }).exceptionally(ex -> {
+            future.completeExceptionally(ex);
+            return null;
+        });
 
         future.whenComplete((r, ex) -> {
             if (ex != null) {
@@ -593,6 +601,9 @@ public class PersistentTopicsBase extends AdminResource {
                 } else if (ex instanceof PulsarAdminException) {
                     asyncResponse.resume(new RestException((PulsarAdminException) ex));
                     return;
+                } else if (ex instanceof WebApplicationException) {
+                    asyncResponse.resume(ex);
+                    return;
                 } else {
                     asyncResponse.resume(new RestException(ex));
                     return;
@@ -602,30 +613,48 @@ public class PersistentTopicsBase extends AdminResource {
             // Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
-            try {
-                globalZk().delete(path, -1);
-                globalZkCache().invalidate(path);
-                // Sync data to all quorums and the observers
-                zkSync(path);
-                log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
-                asyncResponse.resume(Response.noContent().build());
-            } catch (KeeperException.NoNodeException nne) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned topic does not exist"));
-            } catch (KeeperException.BadVersionException e) {
-                log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(),
-                        topicName);
-                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-            } catch (Exception e) {
-                log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
-                asyncResponse.resume(new RestException(e));
-            }
+
+            globalZk().delete(path, -1, (rc, s, o) -> {
+                if (KeeperException.Code.OK.intValue() == rc) {
+                    try {
+                        globalZkCache().invalidate(path);
+                        globalZk().sync(path, (rc2, s2, ctx) -> {
+                            if (KeeperException.Code.OK.intValue() == rc2) {
+                                log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
+                                asyncResponse.resume(Response.noContent().build());
+                            } else {
+                                log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+                                asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                            }
+                        }, null);
+                    } catch (Exception e) {
+                        log.error("Failed to delete partitioned topic.", e);
+                        asyncResponse.resume(new RestException(e));
+                    }
+                } else if (KeeperException.Code.NONODE.intValue() == rc) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned topic does not exist"));
+                } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+                    log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(),
+                            topicName);
+                    asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                } else {
+                    log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+                    asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+                }
+            }, null);
         });
     }
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+        try {
+            if (topicName.isGlobal()) {
+                validateGlobalNamespaceOwnership(namespaceName);
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
@@ -651,32 +680,43 @@ public class PersistentTopicsBase extends AdminResource {
                             Throwable th = exception.getCause();
                             if (th instanceof NotFoundException) {
                                 asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
+                            } else if (th instanceof WebApplicationException) {
+                                asyncResponse.resume(th);
                             } else {
                                 log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception);
                                 asyncResponse.resume(new RestException(exception));
                             }
-                            return null;
+                        } else {
+                            asyncResponse.resume(Response.noContent().build());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                         return null;
                     });
                 } else {
                     internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
                 }
             }).exceptionally(t -> {
-                Throwable th = t.getCause();
-                asyncResponse.resume(new RestException(th));
+                log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, t);
+                if (t instanceof WebApplicationException) {
+                    asyncResponse.resume(t);
+                } else {
+                    asyncResponse.resume(new RestException(t));
+                }
                 return null;
             });
         }
     }
 
     private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        validateTopicOwnership(topicName, authoritative);
-
-        Topic topic = getTopicReference(topicName);
+        Topic topic;
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+            validateTopicOwnership(topicName, authoritative);
+            topic = getTopicReference(topicName);
+        } catch (Exception e) {
+            log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
         topic.close(false).whenComplete((r, ex) -> {
             if (ex != null) {
                 log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex);
@@ -726,66 +766,73 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean authoritative) {
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get subscriptions for topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
         } else {
-            PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                try {
-                    // get the subscriptions only from the 1st partition since all the other partitions will have the same
-                    // subscriptions
-                    pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
-                        .whenComplete((r, ex) -> {
-                            if (ex != null) {
-                                log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
-                                    topicName, ex.getMessage());
-
-                                if (ex instanceof PulsarAdminException) {
-                                    PulsarAdminException pae = (PulsarAdminException) ex;
-                                    if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
-                                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                                            "Internal topics have not been generated yet"));
-                                        return;
-                                    } else {
-                                        asyncResponse.resume(new RestException(pae));
-                                        return;
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    try {
+                        // get the subscriptions only from the 1st partition since all the other partitions will have the same
+                        // subscriptions
+                        pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
+                                .whenComplete((r, ex) -> {
+                                    if (ex != null) {
+                                        log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
+                                                topicName, ex.getMessage());
+
+                                        if (ex instanceof PulsarAdminException) {
+                                            PulsarAdminException pae = (PulsarAdminException) ex;
+                                            if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                        "Internal topics have not been generated yet"));
+                                                return;
+                                            } else {
+                                                asyncResponse.resume(new RestException(pae));
+                                                return;
+                                            }
+                                        } else {
+                                            asyncResponse.resume(new RestException(ex));
+                                            return;
+                                        }
                                     }
-                                } else {
-                                    asyncResponse.resume(new RestException(ex));
-                                    return;
-                                }
-                            }
-                            final List<String> subscriptions = Lists.newArrayList();
-                            subscriptions.addAll(r);
-                            asyncResponse.resume(subscriptions);
-                            return;
-                        });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
-                    asyncResponse.resume(e);
-                    return;
+                                    final List<String> subscriptions = Lists.newArrayList();
+                                    subscriptions.addAll(r);
+                                    asyncResponse.resume(subscriptions);
+                                });
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
+                        asyncResponse.resume(e);
+                    }
+                } else {
+                    internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
                 }
-            } else {
-                internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
-            }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get subscriptions for topic {}", clientAppId(), topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
     private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
-        Topic topic = getTopicReference(topicName);
         try {
+            validateAdminOperationOnTopic(authoritative);
+            Topic topic = getTopicReference(topicName);
             final List<String> subscriptions = Lists.newArrayList();
             topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
             asyncResponse.resume(subscriptions);
-            return;
         } catch (Exception e) {
             log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
-            asyncResponse.resume(new RestException(e));
-            return;
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
@@ -810,11 +857,18 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+        String managedLedger;
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+            if (topicName.isGlobal()) {
+                validateGlobalNamespaceOwnership(namespaceName);
+            }
+            managedLedger = topicName.getPersistenceNamingEncoding();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
         }
-        String managedLedger = topicName.getPersistenceNamingEncoding();
         pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
             @Override
             public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
@@ -832,252 +886,304 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
             boolean perPartition, boolean getPreciseBacklog) {
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions == 0) {
-            throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
-        }
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
-
-        List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
-        for (int i = 0; i < partitionMetadata.partitions; i++) {
             try {
-                topicStatsFutureList
-                        .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
-            } catch (PulsarServerException e) {
-                asyncResponse.resume(new RestException(e));
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
             }
         }
+        getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            if (partitionMetadata.partitions == 0) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+                return;
+            }
+            PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
+            List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+            for (int i = 0; i < partitionMetadata.partitions; i++) {
+                try {
+                    topicStatsFutureList
+                            .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
+                } catch (PulsarServerException e) {
+                    asyncResponse.resume(new RestException(e));
+                    return;
+                }
+            }
 
-        FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
-            CompletableFuture<TopicStats> statFuture = null;
-            for (int i = 0; i < topicStatsFutureList.size(); i++) {
-                statFuture = topicStatsFutureList.get(i);
-                if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
-                    try {
-                        stats.add(statFuture.get());
-                        if (perPartition) {
-                            stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+            FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+                CompletableFuture<TopicStats> statFuture = null;
+                for (int i = 0; i < topicStatsFutureList.size(); i++) {
+                    statFuture = topicStatsFutureList.get(i);
+                    if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+                        try {
+                            stats.add(statFuture.get());
+                            if (perPartition) {
+                                stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+                            }
+                        } catch (Exception e) {
+                            asyncResponse.resume(new RestException(e));
+                            return null;
                         }
-                    } catch (Exception e) {
-                        asyncResponse.resume(new RestException(e));
-                        return null;
                     }
                 }
-            }
-            if (perPartition && stats.partitions.isEmpty()) {
-                String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                try {
-                    boolean zkPathExists = zkPathExists(path);
-                    if (zkPathExists) {
-                        stats.partitions.put(topicName.toString(), new TopicStats());
-                    } else {
-                        asyncResponse.resume(
-                                new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+                if (perPartition && stats.partitions.isEmpty()) {
+                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
+                    try {
+                        boolean zkPathExists = zkPathExists(path);
+                        if (zkPathExists) {
+                            stats.partitions.put(topicName.toString(), new TopicStats());
+                        } else {
+                            asyncResponse.resume(
+                                    new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+                            return null;
+                        }
+                    } catch (KeeperException | InterruptedException e) {
+                        asyncResponse.resume(new RestException(e));
                         return null;
                     }
-                } catch (KeeperException | InterruptedException e) {
-                    asyncResponse.resume(new RestException(e));
-                    return null;
                 }
-            }
-            asyncResponse.resume(stats);
+                asyncResponse.resume(stats);
+                return null;
+            });
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
     }
 
     protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions == 0) {
-            throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
-        }
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
-
-        List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
-        for (int i = 0; i < partitionMetadata.partitions; i++) {
             try {
-                topicStatsFutureList.add(pulsar().getAdminClient().topics()
-                        .getInternalStatsAsync((topicName.getPartition(i).toString())));
-            } catch (PulsarServerException e) {
-                asyncResponse.resume(new RestException(e));
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
             }
         }
+        getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            if (partitionMetadata.partitions == 0) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+                return;
+            }
 
-        FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
-            CompletableFuture<PersistentTopicInternalStats> statFuture = null;
-            for (int i = 0; i < topicStatsFutureList.size(); i++) {
-                statFuture = topicStatsFutureList.get(i);
-                if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
-                    try {
-                        stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
-                    } catch (Exception e) {
-                        asyncResponse.resume(new RestException(e));
-                        return null;
-                    }
+            PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
+
+            List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
+            for (int i = 0; i < partitionMetadata.partitions; i++) {
+                try {
+                    topicStatsFutureList.add(pulsar().getAdminClient().topics()
+                            .getInternalStatsAsync((topicName.getPartition(i).toString())));
+                } catch (PulsarServerException e) {
+                    asyncResponse.resume(new RestException(e));
+                    return;
                 }
             }
-            asyncResponse.resume(!stats.partitions.isEmpty() ? stats
-                    : new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+
+            FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+                CompletableFuture<PersistentTopicInternalStats> statFuture = null;
+                for (int i = 0; i < topicStatsFutureList.size(); i++) {
+                    statFuture = topicStatsFutureList.get(i);
+                    if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+                        try {
+                            stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+                        } catch (Exception e) {
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    }
+                }
+                asyncResponse.resume(!stats.partitions.isEmpty() ? stats
+                        : new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+                return null;
+            });
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
     }
 
     protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
         } else {
-            PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar().getAdminClient().topics()
-                            .deleteSubscriptionAsync(topicNamePartition.toString(), subName));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
-                            e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                    .deleteSubscriptionAsync(topicNamePartition.toString(), subName));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
+                                    e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
                     }
-                }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else if (t instanceof PreconditionFailedException) {
-                            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                                "Subscription has active connected consumers"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                return null;
+                            } else if (t instanceof PreconditionFailedException) {
+                                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                        "Subscription has active connected consumers"));
+                                return null;
+                            } else {
+                                log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
                         }
-                    }
 
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
-            }
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
     private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
-        validateAdminAccessForSubscriber(subName, authoritative);
-        Topic topic = getTopicReference(topicName);
         try {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            Topic topic = getTopicReference(topicName);
             Subscription sub = topic.getSubscription(subName);
-            checkNotNull(sub);
+            if (sub == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                return;
+            }
             sub.delete().get();
             log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
             asyncResponse.resume(Response.noContent().build());
         } catch (Exception e) {
-            Throwable t = e.getCause();
-            if (e instanceof NullPointerException) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-            } else if (t instanceof SubscriptionBusyException) {
+            log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
+            if (e.getCause() instanceof SubscriptionBusyException) {
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                     "Subscription has active connected consumers"));
+            } else if (e instanceof WebApplicationException) {
+                asyncResponse.resume(e);
             } else {
                 log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
-                asyncResponse.resume(new RestException(t));
+                asyncResponse.resume(new RestException(e));
             }
         }
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
         } else {
-            PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
-                            subName));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
+                                    subName));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
                     }
-                }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                return null;
+                            } else {
+                                log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
                         }
-                    }
 
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
-            }
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+               return null;
+            });
         }
     }
 
     private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
-        validateAdminAccessForSubscriber(subName, authoritative);
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
-            if (ex != null) {
-                asyncResponse.resume(new RestException(ex));
-                log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
-            } else {
-                asyncResponse.resume(Response.noContent().build());
-                log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
-            }
-        };
         try {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+                if (ex != null) {
+                    asyncResponse.resume(new RestException(ex));
+                    log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
+                } else {
+                    asyncResponse.resume(Response.noContent().build());
+                    log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
+                }
+            };
             if (subName.startsWith(topic.getReplicatorPrefix())) {
                 String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                 PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
-                checkNotNull(repl);
+                if (repl == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                    return;
+                }
                 repl.clearBacklog().whenComplete(biConsumer);
             } else {
                 PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
+                if (sub == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                    return;
+                }
                 sub.clearBacklog().whenComplete(biConsumer);
             }
         } catch (Exception e) {
-            if (e instanceof NullPointerException) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-            } else {
-                asyncResponse.resume(new RestException(e));
-            }
+            log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
@@ -1115,54 +1221,72 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
         } else {
-            PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-                // expire messages for each partition topic
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
-                            topicNamePartition.toString(), expireTimeInSeconds));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
+                    // expire messages for each partition topic
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
+                                    topicNamePartition.toString(), expireTimeInSeconds));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+                                    topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
                     }
-                }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
-                            topicName, t);
-                        asyncResponse.resume(new RestException(t));
-                        return null;
-                    }
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+                                    topicName, t);
+                            asyncResponse.resume(new RestException(t));
+                            return null;
+                        }
 
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
-            }
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
     private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
         // validate ownership and redirect if current broker is not owner
-        validateAdminOperationOnTopic(authoritative);
+        PersistentTopic topic;
+        try {
+            validateAdminOperationOnTopic(authoritative);
 
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+            topic = (PersistentTopic) getTopicReference(topicName);
+        } catch (Exception e) {
+            log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
         final AtomicReference<Throwable> exception = new AtomicReference<>();
 
         topic.getReplicators().forEach((subName, replicator) -> {
@@ -1198,111 +1322,124 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
             boolean authoritative) {
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
         } else {
-            PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-            final int numPartitions = partitionMetadata.partitions;
-            if (numPartitions > 0) {
-                final CompletableFuture<Void> future = new CompletableFuture<>();
-                final AtomicInteger count = new AtomicInteger(numPartitions);
-                final AtomicInteger failureCount = new AtomicInteger(0);
-                final AtomicReference<Throwable> partitionException = new AtomicReference<>();
-
-                for (int i = 0; i < numPartitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        pulsar().getAdminClient().topics()
-                            .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
-                            if (ex != null) {
-                                if (ex instanceof PreconditionFailedException) {
-                                    // throw the last exception if all partitions get this error
-                                    // any other exception on partition is reported back to user
-                                    failureCount.incrementAndGet();
-                                    partitionException.set(ex);
-                                } else {
-                                    log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                        clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                    future.completeExceptionally(ex);
-                                    return null;
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                final int numPartitions = partitionMetadata.partitions;
+                if (numPartitions > 0) {
+                    final CompletableFuture<Void> future = new CompletableFuture<>();
+                    final AtomicInteger count = new AtomicInteger(numPartitions);
+                    final AtomicInteger failureCount = new AtomicInteger(0);
+                    final AtomicReference<Throwable> partitionException = new AtomicReference<>();
+
+                    for (int i = 0; i < numPartitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            pulsar().getAdminClient().topics()
+                                    .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
+                                if (ex != null) {
+                                    if (ex instanceof PreconditionFailedException) {
+                                        // throw the last exception if all partitions get this error
+                                        // any other exception on partition is reported back to user
+                                        failureCount.incrementAndGet();
+                                        partitionException.set(ex);
+                                    } else {
+                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                                clientAppId(), topicNamePartition, subName, timestamp, ex);
+                                        future.completeExceptionally(ex);
+                                        return null;
+                                    }
                                 }
-                            }
 
-                            if (count.decrementAndGet() == 0) {
-                                future.complete(null);
-                            }
+                                if (count.decrementAndGet() == 0) {
+                                    future.complete(null);
+                                }
 
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                            topicNamePartition, subName, timestamp, e);
-                        future.completeExceptionally(e);
+                                return null;
+                            });
+                        } catch (Exception e) {
+                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
+                                    topicNamePartition, subName, timestamp, e);
+                            future.completeExceptionally(e);
+                        }
                     }
-                }
 
-                future.whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        if (ex instanceof PulsarAdminException) {
-                            asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                            return;
-                        } else {
-                            asyncResponse.resume(new RestException(ex));
-                            return;
+                    future.whenComplete((r, ex) -> {
+                        if (ex != null) {
+                            if (ex instanceof PulsarAdminException) {
+                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
+                                return;
+                            } else {
+                                asyncResponse.resume(new RestException(ex));
+                                return;
+                            }
                         }
-                    }
 
-                    // report an error to user if unable to reset for all partitions
-                    if (failureCount.get() == numPartitions) {
-                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
-                            subName, timestamp, partitionException.get());
-                        asyncResponse.resume(
-                            new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
-                        return;
-                    } else if (failureCount.get() > 0) {
-                        log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
-                            topicName, subName, timestamp, partitionException.get());
-                    }
+                        // report an error to user if unable to reset for all partitions
+                        if (failureCount.get() == numPartitions) {
+                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
+                                    subName, timestamp, partitionException.get());
+                            asyncResponse.resume(
+                                    new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
+                            return;
+                        } else if (failureCount.get() > 0) {
+                            log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
+                                    topicName, subName, timestamp, partitionException.get());
+                        }
 
-                    asyncResponse.resume(Response.noContent().build());
-                });
-            } else {
-                internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-            }
+                        asyncResponse.resume(Response.noContent().build());
+                    });
+                } else {
+                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
     private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
                                        boolean authoritative) {
-        validateAdminAccessForSubscriber(subName, authoritative);
-        log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-            timestamp);
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        if (topic == null) {
-            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
-            return;
-        }
         try {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
+                    timestamp);
+            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+            if (topic == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                return;
+            }
             PersistentSubscription sub = topic.getSubscription(subName);
-            checkNotNull(sub);
+            if (sub == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                return;
+            }
             sub.resetCursor(timestamp).get();
             log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
                 timestamp);
             asyncResponse.resume(Response.noContent().build());
         } catch (Exception e) {
-            Throwable t = e.getCause();
             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
                 subName, timestamp, e);
-            if (e instanceof NullPointerException) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-            } else if (e instanceof NotAllowedException) {
+            if (e instanceof NotAllowedException) {
                 asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
-            } else if (t instanceof SubscriptionInvalidCursorPosition) {
+            } else if (e instanceof SubscriptionInvalidCursorPosition) {
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                    "Unable to find position for timestamp specified -" + t.getMessage()));
+                    "Unable to find position for timestamp specified -" + e.getMessage()));
+            } else if (e instanceof WebApplicationException) {
+                asyncResponse.resume(e);
             } else {
                 asyncResponse.resume(new RestException(e));
             }
@@ -1312,7 +1449,13 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
             MessageIdImpl messageId, boolean authoritative, boolean replicated) {
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
         }
         final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.earliest : messageId;
         log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
@@ -1321,86 +1464,88 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
         } else {
-            PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-            final int numPartitions = partitionMetadata.partitions;
-            if (numPartitions > 0) {
-                final CompletableFuture<Void> future = new CompletableFuture<>();
-                final AtomicInteger count = new AtomicInteger(numPartitions);
-                final AtomicInteger failureCount = new AtomicInteger(0);
-                final AtomicReference<Throwable> partitionException = new AtomicReference<>();
-
-                // Create the subscription on each partition
-                for (int i = 0; i < numPartitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        pulsar().getAdminClient().topics()
-                            .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
-                            .handle((r, ex) -> {
-                                if (ex != null) {
-                                    // fail the operation on unknown exception or if all the partitioned failed due to
-                                    // subscription-already-exist
-                                    if (failureCount.incrementAndGet() == numPartitions
-                                        || !(ex instanceof PulsarAdminException.ConflictException)) {
-                                        partitionException.set(ex);
-                                    }
-                                }
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                final int numPartitions = partitionMetadata.partitions;
+                if (numPartitions > 0) {
+                    final CompletableFuture<Void> future = new CompletableFuture<>();
+                    final AtomicInteger count = new AtomicInteger(numPartitions);
+                    final AtomicInteger failureCount = new AtomicInteger(0);
+                    final AtomicReference<Throwable> partitionException = new AtomicReference<>();
+
+                    // Create the subscription on each partition
+                    for (int i = 0; i < numPartitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            pulsar().getAdminClient().topics()
+                                    .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
+                                    .handle((r, ex) -> {
+                                        if (ex != null) {
+                                            // fail the operation on unknown exception or if all the partitioned failed due to
+                                            // subscription-already-exist
+                                            if (failureCount.incrementAndGet() == numPartitions
+                                                    || !(ex instanceof PulsarAdminException.ConflictException)) {
+                                                partitionException.set(ex);
+                                            }
+                                        }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                        if (count.decrementAndGet() == 0) {
+                                            future.complete(null);
+                                        }
 
-                                return null;
-                            });
-                    } catch (Exception e) {
-                        log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
-                            topicNamePartition, subscriptionName, targetMessageId, e);
-                        future.completeExceptionally(e);
+                                        return null;
+                                    });
+                        } catch (Exception e) {
+                            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
+                                    topicNamePartition, subscriptionName, targetMessageId, e);
+                            future.completeExceptionally(e);
+                        }
                     }
-                }
 
-                future.whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        if (ex instanceof PulsarAdminException) {
-                            asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                            return;
-                        } else {
-                            asyncResponse.resume(new RestException(ex));
-                            return;
+                    future.whenComplete((r, ex) -> {
+                        if (ex != null) {
+                            if (ex instanceof PulsarAdminException) {
+                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
+                                return;
+                            } else {
+                                asyncResponse.resume(new RestException(ex));
+                                return;
+                            }
                         }
-                    }
 
-                    if (partitionException.get() != null) {
-                        log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
-                            subscriptionName, targetMessageId, partitionException.get());
-                        if (partitionException.get() instanceof PulsarAdminException) {
-                            asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
-                            return;
-                        } else {
-                            asyncResponse.resume(new RestException(partitionException.get()));
-                            return;
+                        if (partitionException.get() != null) {
+                            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
+                                    subscriptionName, targetMessageId, partitionException.get());
+                            if (partitionException.get() instanceof PulsarAdminException) {
+                                asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
+                                return;
+                            } else {
+                                asyncResponse.resume(new RestException(partitionException.get()));
+                                return;
+                            }
                         }
-                    }
 
-                    asyncResponse.resume(Response.noContent().build());
-                });
-            } else {
-                internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
-            }
+                        asyncResponse.resume(Response.noContent().build());
+                    });
+                } else {
+                    internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
     private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subscriptionName,
               MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
-        validateAdminAccessForSubscriber(subscriptionName, authoritative);
-
-        PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);
-
-        if (topic.getSubscriptions().containsKey(subscriptionName)) {
-            asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
-            return;
-        }
-
         try {
+            validateAdminAccessForSubscriber(subscriptionName, authoritative);
+            PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);
+            if (topic.getSubscriptions().containsKey(subscriptionName)) {
+                asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
+                return;
+            }
             PersistentSubscription subscription = (PersistentSubscription) topic
                 .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
             // Mark the cursor as "inactive" as it was created without a real consumer connected
@@ -1414,10 +1559,10 @@ public class PersistentTopicsBase extends AdminResource {
             if (t instanceof SubscriptionInvalidCursorPosition) {
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                     "Unable to find position for position specified: " + t.getMessage()));
-                return;
+            } else if (e instanceof WebApplicationException) {
+                asyncResponse.resume(e);
             } else {
                 asyncResponse.resume(new RestException(e));
-                return;
             }
         }