You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/01 10:37:25 UTC

[pulsar] branch master updated: [improve][broker] Make some methods of `ClusterBase` pure async. (#15847)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4be6bb0520 [improve][broker] Make some methods of `ClusterBase` pure async. (#15847)
d4be6bb0520 is described below

commit d4be6bb05204c270122e159c5c361cd7a2412805
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Wed Jun 1 18:37:14 2022 +0800

    [improve][broker] Make some methods of `ClusterBase` pure async. (#15847)
---
 .../pulsar/broker/resources/ClusterResources.java  |  5 ++
 .../pulsar/broker/admin/impl/ClustersBase.java     | 84 +++++++++++-----------
 2 files changed, 45 insertions(+), 44 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 91639578d26..5bde79e6321 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -162,6 +162,11 @@ public class ClusterResources extends BaseResources<ClusterData> {
             delete(path);
         }
 
+        public CompletableFuture<Void> deleteFailureDomainAsync(String clusterName, String domainName) {
+            String path = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName);
+            return deleteAsync(path);
+        }
+
         public CompletableFuture<Void> deleteFailureDomainsAsync(String clusterName) {
             String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN);
             return existsAsync(failureDomainPath)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 034d7b5f9fe..7797f5dcaac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -834,31 +834,26 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 412, message = "Cluster doesn't exist"),
         @ApiResponse(code = 500, message = "Internal server error")
     })
-    public FailureDomainImpl getDomain(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+    public void getDomain(
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The failure domain name",
-            required = true
-        )
+        @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-
-        try {
-            return clusterResources().getFailureDomainResources().getFailureDomain(cluster, domainName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+                .thenCompose(__ -> clusterResources().getFailureDomainResources()
+                        .getFailureDomainAsync(cluster, domainName))
+                .thenAccept(domain -> {
+                    FailureDomainImpl failureDomain = domain.orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "Domain " + domainName + " for cluster " + cluster + " does not exist"));
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e);
-            throw new RestException(e);
-        }
+                    asyncResponse.resume(failureDomain);
+                }).exceptionally(ex -> {
+                    log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -874,30 +869,31 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error")
     })
     public void deleteFailureDomain(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The failure domain name",
-            required = true
-        )
+        @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-
-        try {
-            clusterResources().getFailureDomainResources().deleteFailureDomain(cluster, domainName);
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
-            throw new RestException(Status.NOT_FOUND,
-                    "Domain-name " + domainName + " or cluster " + cluster + " does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e);
-            throw new RestException(e);
-        }
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+                .thenCompose(__ -> clusterResources()
+                        .getFailureDomainResources().deleteFailureDomainAsync(cluster, domainName))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successful delete domain {} in cluster {}", clientAppId(), domainName, cluster);
+                    asyncResponse.resume(Response.ok().build());
+                }).exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof NotFoundException) {
+                        log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "Domain-name " + domainName + " or cluster " + cluster + " does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final String cluster,