You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/24 01:25:27 UTC

[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15685: [improve][broker] Make some methods of ClusterBase pure async.

mattisonchao commented on code in PR #15685:
URL: https://github.com/apache/pulsar/pull/15685#discussion_r879978967


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -742,38 +735,37 @@ public void deleteNamespaceIsolationPolicy(
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void setFailureDomain(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The failure domain name",
-            required = true
-        )
+        @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName,
-        @ApiParam(
-            value = "The configuration data of a failure domain",
-            required = true
-        )
-                FailureDomainImpl domain
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validateBrokerExistsInOtherDomain(cluster, domainName, domain);
-
-        try {
-            clusterResources().getFailureDomainResources()
-                    .setFailureDomainWithCreate(cluster, domainName, old -> domain);
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", clientAppId(), cluster,
-                    domainName);
-            throw new RestException(Status.NOT_FOUND,
-                    "Domain " + domainName + " for cluster " + cluster + " does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
-            throw new RestException(e);
-        }
+        @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomainImpl domain
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+                .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster, domainName, domain))
+                .thenCompose(__ -> clusterResources().getFailureDomainResources()
+                        .setFailureDomainWithCreateAsync(cluster, domainName, old -> domain))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successful set failure domain {} for cluster {}",
+                            clientAppId(), domainName, cluster);
+                    asyncResponse.resume(Response.ok().build());

Review Comment:
   Fixed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -686,46 +687,38 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIs
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void deleteNamespaceIsolationPolicy(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The namespace isolation policy name",
-            required = true
-        )
+        @ApiParam(value = "The namespace isolation policy name", required = true)
         @PathParam("policyName") String policyName
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validatePoliciesReadOnlyAccess();
-
-        try {
-
-            NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster).orElseGet(() -> {
-                        try {
-                            namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
-                                    (p) -> Collections.emptyMap());
-                            return new NamespaceIsolationPolicies();
-                        } catch (Exception e) {
-                            throw new RestException(e);
-                        }
-                    });
-
-            nsIsolationPolicies.deletePolicy(policyName);
-            namespaceIsolationPolicies().setIsolationData(cluster, old -> nsIsolationPolicies.getPolicies());
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
-                    cluster);
-            throw new RestException(Status.NOT_FOUND,
-                    "NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
-                    policyName, e);
-            throw new RestException(e);
-        }
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
+                .thenCompose(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+                        .orElseGet(() -> namespaceIsolationPolicies()
+                                .setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
+                                .thenApply(__ -> new NamespaceIsolationPolicies())))
+                .thenCompose(policies -> {
+                    policies.deletePolicy(policyName);
+                    return namespaceIsolationPolicies().setIsolationDataAsync(cluster, old -> policies.getPolicies());
+                }).thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org