You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/01 10:37:25 UTC
[pulsar] branch master updated: [improve][broker] Make some methods of `ClusterBase` pure async. (#15847)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4be6bb0520 [improve][broker] Make some methods of `ClusterBase` pure async. (#15847)
d4be6bb0520 is described below
commit d4be6bb05204c270122e159c5c361cd7a2412805
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Wed Jun 1 18:37:14 2022 +0800
[improve][broker] Make some methods of `ClusterBase` pure async. (#15847)
---
.../pulsar/broker/resources/ClusterResources.java | 5 ++
.../pulsar/broker/admin/impl/ClustersBase.java | 84 +++++++++++-----------
2 files changed, 45 insertions(+), 44 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 91639578d26..5bde79e6321 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -162,6 +162,11 @@ public class ClusterResources extends BaseResources<ClusterData> {
delete(path);
}
+ public CompletableFuture<Void> deleteFailureDomainAsync(String clusterName, String domainName) {
+ String path = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName);
+ return deleteAsync(path);
+ }
+
public CompletableFuture<Void> deleteFailureDomainsAsync(String clusterName) {
String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN);
return existsAsync(failureDomainPath)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 034d7b5f9fe..7797f5dcaac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -834,31 +834,26 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 412, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
- public FailureDomainImpl getDomain(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ public void getDomain(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The failure domain name",
- required = true
- )
+ @ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- try {
- return clusterResources().getFailureDomainResources().getFailureDomain(cluster, domainName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+ .thenCompose(__ -> clusterResources().getFailureDomainResources()
+ .getFailureDomainAsync(cluster, domainName))
+ .thenAccept(domain -> {
+ FailureDomainImpl failureDomain = domain.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Domain " + domainName + " for cluster " + cluster + " does not exist"));
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e);
- throw new RestException(e);
- }
+ asyncResponse.resume(failureDomain);
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -874,30 +869,31 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public void deleteFailureDomain(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The failure domain name",
- required = true
- )
+ @ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- try {
- clusterResources().getFailureDomainResources().deleteFailureDomain(cluster, domainName);
- } catch (NotFoundException nne) {
- log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
- throw new RestException(Status.NOT_FOUND,
- "Domain-name " + domainName + " or cluster " + cluster + " does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e);
- throw new RestException(e);
- }
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+ .thenCompose(__ -> clusterResources()
+ .getFailureDomainResources().deleteFailureDomainAsync(cluster, domainName))
+ .thenAccept(__ -> {
+ log.info("[{}] Successful delete domain {} in cluster {}", clientAppId(), domainName, cluster);
+ asyncResponse.resume(Response.ok().build());
+ }).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof NotFoundException) {
+ log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Domain-name " + domainName + " or cluster " + cluster + " does not exist"));
+ return null;
+ }
+ log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final String cluster,