You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/28 00:57:17 UTC

[GitHub] [pulsar] mattisonchao opened a new pull request, #15358: [improve][broker] Make some methods of `ClusterBase` pure async.

mattisonchao opened a new pull request, #15358:
URL: https://github.com/apache/pulsar/pull/15358

   ### Motivation
   
   See PIP #14365  and change tracker #15043. 
   
   This PR is still draft because it relies on #15318
   
   ### Modifications
   
   - Make `ClusterBase` `setPeerClusterNames / getPeerCluster / deleteCluster` methods to pure async.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   - [x] `no-need-doc` 
   (Please explain why)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao merged pull request #15358: [improve][broker] Make some methods of `ClusterBase` pure async.

Posted by GitBox <gi...@apache.org>.
mattisonchao merged PR #15358:
URL: https://github.com/apache/pulsar/pull/15358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15358: [improve][broker] Make some methods of `ClusterBase` pure async.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15358:
URL: https://github.com/apache/pulsar/pull/15358#discussion_r868717783


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -346,54 +339,51 @@ public Set<String> getPeerCluster(
             @ApiResponse(code = 412, message = "Cluster is not empty."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public void deleteCluster(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster
-    ) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
+    public void deleteCluster(@Suspended AsyncResponse asyncResponse,
+                              @ApiParam(value = "The cluster name", required = true)
+                              @PathParam("cluster") String cluster) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> internalDeleteClusterAsync(cluster))
+                .thenAccept(__ -> {
+                    log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
+                    asyncResponse.resume(Response.ok().build());
+                }).exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
 
+    private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
         // Check that the cluster is not used by any tenant (eg: no namespaces provisioned there)
-        boolean isClusterUsed = false;
-        try {
-            isClusterUsed = pulsar().getPulsarResources().getClusterResources().isClusterUsed(cluster);
-
-            // check the namespaceIsolationPolicies associated with the cluster
-            Optional<NamespaceIsolationPolicies> nsIsolationPolicies =
-                    namespaceIsolationPolicies().getIsolationDataPolicies(cluster);
-
-            // Need to delete the isolation policies if present
-            if (nsIsolationPolicies.isPresent()) {
-                if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
-                    namespaceIsolationPolicies().deleteIsolationData(cluster);
-                } else {
-                    isClusterUsed = true;
-                }
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-
-        if (isClusterUsed) {
-            log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), cluster);
-            throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
-        }
-
-        try {
-            clusterResources().getFailureDomainResources().deleteFailureDomains(cluster);
-            clusterResources().deleteCluster(cluster);
-            log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
-        } catch (NotFoundException e) {
-            log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
-            throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
+        return pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster)
+                .thenCompose(isClusterUsed -> {
+                    if (isClusterUsed) {
+                        throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
+                    }
+                    // check the namespaceIsolationPolicies associated with the cluster
+                    return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
+                }).thenCompose(nsIsolationPoliciesOpt -> {
+                    if (nsIsolationPoliciesOpt.isPresent()) {
+                        if (!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) {
+                            throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
+                        }
+                        return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster);
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }).thenCompose(unused -> {
+                    // Need to delete the isolation policies if present

Review Comment:
   fixed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -238,68 +239,62 @@ public void updateCluster(
             @ApiResponse(code = 412, message = "Peer cluster doesn't exist."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public void setPeerClusterNames(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The list of peer cluster names",
-            required = true,
-            examples = @Example(
-                value = @ExampleProperty(
-                    mediaType = MediaType.APPLICATION_JSON,
-                    value =
-                          "[\n"
-                        + "   'cluster-a',\n"
-                        + "   'cluster-b'\n"
-                        + "]"
-                )
-            )
-        )
-        LinkedHashSet<String> peerClusterNames
-    ) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
+    public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse,
+                                    @ApiParam(value = "The cluster name", required = true)
+                                    @PathParam("cluster") String cluster,
+                                    @ApiParam(
+                                        value = "The list of peer cluster names",
+                                        required = true,
+                                        examples = @Example(
+                                        value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+                                        value = "[\n"
+                                                + "   'cluster-a',\n"
+                                                + "   'cluster-b'\n"
+                                                + "]")))
+                                    LinkedHashSet<String> peerClusterNames) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, peerClusterNames))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully added peer-cluster {} for {}",
+                            clientAppId(), peerClusterNames, cluster);
+                    asyncResponse.resume(Response.ok().build());

Review Comment:
   fixed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -346,54 +339,51 @@ public Set<String> getPeerCluster(
             @ApiResponse(code = 412, message = "Cluster is not empty."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public void deleteCluster(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster
-    ) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
+    public void deleteCluster(@Suspended AsyncResponse asyncResponse,
+                              @ApiParam(value = "The cluster name", required = true)
+                              @PathParam("cluster") String cluster) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> internalDeleteClusterAsync(cluster))
+                .thenAccept(__ -> {
+                    log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
+                    asyncResponse.resume(Response.ok().build());

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15358: [improve][broker] Make some methods of `ClusterBase` pure async.

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15358:
URL: https://github.com/apache/pulsar/pull/15358#discussion_r868005471


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -238,68 +239,62 @@ public void updateCluster(
             @ApiResponse(code = 412, message = "Peer cluster doesn't exist."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public void setPeerClusterNames(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The list of peer cluster names",
-            required = true,
-            examples = @Example(
-                value = @ExampleProperty(
-                    mediaType = MediaType.APPLICATION_JSON,
-                    value =
-                          "[\n"
-                        + "   'cluster-a',\n"
-                        + "   'cluster-b'\n"
-                        + "]"
-                )
-            )
-        )
-        LinkedHashSet<String> peerClusterNames
-    ) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
+    public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse,
+                                    @ApiParam(value = "The cluster name", required = true)
+                                    @PathParam("cluster") String cluster,
+                                    @ApiParam(
+                                        value = "The list of peer cluster names",
+                                        required = true,
+                                        examples = @Example(
+                                        value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+                                        value = "[\n"
+                                                + "   'cluster-a',\n"
+                                                + "   'cluster-b'\n"
+                                                + "]")))
+                                    LinkedHashSet<String> peerClusterNames) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, peerClusterNames))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully added peer-cluster {} for {}",
+                            clientAppId(), peerClusterNames, cluster);
+                    asyncResponse.resume(Response.ok().build());

Review Comment:
   The annotation ApiResponses shows the response should be no content(code=204). I'm not sure if we need to use the `Response.ok()`, or if we need to modify the doc.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -346,54 +339,51 @@ public Set<String> getPeerCluster(
             @ApiResponse(code = 412, message = "Cluster is not empty."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public void deleteCluster(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster
-    ) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
+    public void deleteCluster(@Suspended AsyncResponse asyncResponse,
+                              @ApiParam(value = "The cluster name", required = true)
+                              @PathParam("cluster") String cluster) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> internalDeleteClusterAsync(cluster))
+                .thenAccept(__ -> {
+                    log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
+                    asyncResponse.resume(Response.ok().build());
+                }).exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
 
+    private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
         // Check that the cluster is not used by any tenant (eg: no namespaces provisioned there)
-        boolean isClusterUsed = false;
-        try {
-            isClusterUsed = pulsar().getPulsarResources().getClusterResources().isClusterUsed(cluster);
-
-            // check the namespaceIsolationPolicies associated with the cluster
-            Optional<NamespaceIsolationPolicies> nsIsolationPolicies =
-                    namespaceIsolationPolicies().getIsolationDataPolicies(cluster);
-
-            // Need to delete the isolation policies if present
-            if (nsIsolationPolicies.isPresent()) {
-                if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
-                    namespaceIsolationPolicies().deleteIsolationData(cluster);
-                } else {
-                    isClusterUsed = true;
-                }
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-
-        if (isClusterUsed) {
-            log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), cluster);
-            throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
-        }
-
-        try {
-            clusterResources().getFailureDomainResources().deleteFailureDomains(cluster);
-            clusterResources().deleteCluster(cluster);
-            log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
-        } catch (NotFoundException e) {
-            log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
-            throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
+        return pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster)
+                .thenCompose(isClusterUsed -> {
+                    if (isClusterUsed) {
+                        throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
+                    }
+                    // check the namespaceIsolationPolicies associated with the cluster
+                    return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
+                }).thenCompose(nsIsolationPoliciesOpt -> {
+                    if (nsIsolationPoliciesOpt.isPresent()) {
+                        if (!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) {
+                            throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
+                        }
+                        return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster);
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }).thenCompose(unused -> {
+                    // Need to delete the isolation policies if present

Review Comment:
   It seems that this comment is not here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -346,54 +339,51 @@ public Set<String> getPeerCluster(
             @ApiResponse(code = 412, message = "Cluster is not empty."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public void deleteCluster(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster
-    ) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
+    public void deleteCluster(@Suspended AsyncResponse asyncResponse,
+                              @ApiParam(value = "The cluster name", required = true)
+                              @PathParam("cluster") String cluster) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> internalDeleteClusterAsync(cluster))
+                .thenAccept(__ -> {
+                    log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
+                    asyncResponse.resume(Response.ok().build());

Review Comment:
   Please make it consistent with the annotation `ApiResponses`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org