You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/27 09:10:30 UTC

[pulsar] branch master updated: [improve][broker] Make some methods of ClusterBase pure async. (#15685)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d9340aca6e [improve][broker] Make some methods of ClusterBase pure async.  (#15685)
8d9340aca6e is described below

commit 8d9340aca6e5581183153738338744cb59106e11
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Fri May 27 17:10:20 2022 +0800

    [improve][broker] Make some methods of ClusterBase pure async.  (#15685)
---
 .../pulsar/broker/resources/ClusterResources.java  |  12 +
 .../pulsar/broker/admin/impl/ClustersBase.java     | 274 +++++++++++----------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   5 +-
 3 files changed, 156 insertions(+), 135 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 7dbf2839b39..91639578d26 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -141,10 +141,17 @@ public class ClusterResources extends BaseResources<ClusterData> {
             super(store, clazz, operationTimeoutSec);
         }
 
+        public CompletableFuture<List<String>> listFailureDomainsAsync(String clusterName) {
+            return getChildrenAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN));
+        }
         public List<String> listFailureDomains(String clusterName) throws MetadataStoreException {
             return getChildren(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN));
         }
 
+        public CompletableFuture<Optional<FailureDomainImpl>> getFailureDomainAsync(String clusterName,
+                                                                                    String domainName) {
+            return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName));
+        }
         public Optional<FailureDomainImpl> getFailureDomain(String clusterName, String domainName)
                 throws MetadataStoreException {
             return get(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName));
@@ -183,6 +190,11 @@ public class ClusterResources extends BaseResources<ClusterData> {
             delete(failureDomainPath);
         }
 
+        public CompletableFuture<Void> setFailureDomainWithCreateAsync(String clusterName, String domainName,
+                                           Function<Optional<FailureDomainImpl>, FailureDomainImpl> createFunction) {
+            return setWithCreateAsync(
+                    joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName), createFunction);
+        }
         public void setFailureDomainWithCreate(String clusterName, String domainName,
                                                Function<Optional<FailureDomainImpl>, FailureDomainImpl> createFunction)
                 throws MetadataStoreException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index a1fd8a24733..034d7b5f9fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
-import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
@@ -31,6 +30,7 @@ import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -41,6 +41,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -48,9 +49,9 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.Constants;
@@ -686,46 +687,38 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void deleteNamespaceIsolationPolicy(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The namespace isolation policy name",
-            required = true
-        )
+        @ApiParam(value = "The namespace isolation policy name", required = true)
         @PathParam("policyName") String policyName
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validatePoliciesReadOnlyAccess();
-
-        try {
-
-            NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster).orElseGet(() -> {
-                        try {
-                            namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
-                                    (p) -> Collections.emptyMap());
-                            return new NamespaceIsolationPolicies();
-                        } catch (Exception e) {
-                            throw new RestException(e);
-                        }
-                    });
-
-            nsIsolationPolicies.deletePolicy(policyName);
-            namespaceIsolationPolicies().setIsolationData(cluster, old -> nsIsolationPolicies.getPolicies());
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
-                    cluster);
-            throw new RestException(Status.NOT_FOUND,
-                    "NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
-                    policyName, e);
-            throw new RestException(e);
-        }
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
+                .thenCompose(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+                        .orElseGet(() -> namespaceIsolationPolicies()
+                                .setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
+                                .thenApply(__ -> new NamespaceIsolationPolicies())))
+                .thenCompose(policies -> {
+                    policies.deletePolicy(policyName);
+                    return namespaceIsolationPolicies().setIsolationDataAsync(cluster, old -> policies.getPolicies());
+                }).thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist",
+                                clientAppId(), cluster);
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
+                            policyName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -742,38 +735,37 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void setFailureDomain(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The failure domain name",
-            required = true
-        )
+        @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName,
-        @ApiParam(
-            value = "The configuration data of a failure domain",
-            required = true
-        )
-                FailureDomainImpl domain
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validateBrokerExistsInOtherDomain(cluster, domainName, domain);
-
-        try {
-            clusterResources().getFailureDomainResources()
-                    .setFailureDomainWithCreate(cluster, domainName, old -> domain);
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", clientAppId(), cluster,
-                    domainName);
-            throw new RestException(Status.NOT_FOUND,
-                    "Domain " + domainName + " for cluster " + cluster + " does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
-            throw new RestException(e);
-        }
+        @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomainImpl domain
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+                .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster, domainName, domain))
+                .thenCompose(__ -> clusterResources().getFailureDomainResources()
+                        .setFailureDomainWithCreateAsync(cluster, domainName, old -> domain))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successful set failure domain {} for cluster {}",
+                            clientAppId(), domainName, cluster);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", clientAppId(), cluster,
+                                domainName);
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "Domain " + domainName + " for cluster " + cluster + " does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to update clusters/{}/domainName/{}",
+                            clientAppId(), cluster, domainName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -788,34 +780,45 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 500, message = "Internal server error")
     })
-    public Map<String, FailureDomainImpl> getFailureDomains(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+    public void getFailureDomains(
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster
-    ) throws Exception {
-        validateSuperUserAccess();
-
-        Map<String, FailureDomainImpl> domains = Maps.newHashMap();
-        try {
-            FailureDomainResources fdr = clusterResources().getFailureDomainResources();
-            for (String domainName : fdr.listFailureDomains(cluster)) {
-                try {
-                    Optional<FailureDomainImpl> domain = fdr.getFailureDomain(cluster, domainName);
-                    domain.ifPresent(failureDomain -> domains.put(domainName, failureDomain));
-                } catch (Exception e) {
-                    log.warn("Failed to get domain {}", domainName, e);
-                }
-            }
-        } catch (NotFoundException e) {
-            log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e);
-            return Collections.emptyMap();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-        return domains;
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> clusterResources().getFailureDomainResources()
+                        .listFailureDomainsAsync(cluster)
+                        .thenCompose(domainNames -> {
+                            List<CompletableFuture<Pair<String, Optional<FailureDomainImpl>>>> futures =
+                                domainNames.stream()
+                                    .map(domainName -> clusterResources().getFailureDomainResources()
+                                            .getFailureDomainAsync(cluster, domainName)
+                                            .thenApply(failureDomainImpl -> Pair.of(domainName, failureDomainImpl))
+                                            .exceptionally(ex -> {
+                                                log.warn("Failed to get domain {}", domainName, ex);
+                                                return null;
+                                            })).collect(Collectors.toList());
+                            return FutureUtil.waitForAll(futures)
+                                    .thenApply(unused -> futures.stream()
+                                            .map(CompletableFuture::join)
+                                            .filter(Objects::nonNull)
+                                            .filter(v -> v.getRight().isPresent())
+                                            .collect(Collectors.toMap(Pair::getLeft, v -> v.getRight().get())));
+                        }).exceptionally(ex -> {
+                            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                            if (realCause instanceof NotFoundException) {
+                                log.warn("[{}] Failure-domain is not configured for cluster {}",
+                                        clientAppId(), cluster, ex);
+                                return Collections.emptyMap();
+                            }
+                            throw FutureUtil.wrapToCompletionException(ex);
+                        })
+                ).thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -897,42 +900,49 @@ public class ClustersBase extends AdminResource {
         }
     }
 
-    private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName,
-            final FailureDomainImpl inputDomain) {
-        if (inputDomain != null && inputDomain.brokers != null) {
-            try {
-                for (String domainName : clusterResources().getFailureDomainResources()
-                        .listFailureDomains(cluster)) {
-                    if (inputDomainName.equals(domainName)) {
-                        continue;
-                    }
-                    try {
-                        Optional<FailureDomainImpl> domain =
-                                clusterResources().getFailureDomainResources().getFailureDomain(cluster, domainName);
-                        if (domain.isPresent() && domain.get().brokers != null) {
-                            List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
-                                    .filter(inputDomain.brokers::contains).collect(Collectors.toList());
-                            if (!duplicateBrokers.isEmpty()) {
-                                throw new RestException(Status.CONFLICT,
-                                        duplicateBrokers + " already exists in " + domainName);
-                            }
-                        }
-                    } catch (Exception e) {
-                        if (e instanceof RestException) {
-                            throw e;
-                        }
-                        log.warn("Failed to get domain {}", domainName, e);
-                    }
-                }
-            } catch (NotFoundException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Domain is not configured for cluster", clientAppId(), e);
-                }
-            } catch (Exception e) {
-                log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final String cluster,
+                                                                      final String inputDomainName,
+                                                                      final FailureDomainImpl inputDomain) {
+        if (inputDomain == null || inputDomain.brokers == null) {
+            return CompletableFuture.completedFuture(null);
         }
+        return clusterResources().getFailureDomainResources()
+                .listFailureDomainsAsync(cluster)
+                .thenCompose(domainNames -> {
+                    List<CompletableFuture<Void>> futures = domainNames.stream()
+                            .filter(domainName -> !domainName.equals(inputDomainName))
+                            .map(domainName -> clusterResources()
+                                    .getFailureDomainResources().getFailureDomainAsync(cluster, domainName)
+                                    .thenAccept(failureDomainOpt -> {
+                                        if (failureDomainOpt.isPresent()
+                                                && CollectionUtils.isNotEmpty(failureDomainOpt.get().getBrokers())) {
+                                            List<String> duplicateBrokers = failureDomainOpt.get()
+                                                    .getBrokers().stream().parallel()
+                                                    .filter(inputDomain.brokers::contains)
+                                                    .collect(Collectors.toList());
+                                            if (CollectionUtils.isNotEmpty(duplicateBrokers)) {
+                                                throw new RestException(Status.CONFLICT,
+                                                        duplicateBrokers + " already exists in " + domainName);
+                                            }
+                                        }
+                                    }).exceptionally(ex -> {
+                                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                                        if (realCause instanceof WebApplicationException) {
+                                            throw FutureUtil.wrapToCompletionException(ex);
+                                        }
+                                        if (realCause instanceof NotFoundException) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("[{}] Domain is not configured for cluster",
+                                                        clientAppId(), ex);
+                                            }
+                                            return null;
+                                        }
+                                        log.warn("Failed to get domain {}", domainName, ex);
+                                        return null;
+                                    })
+                            ).collect(Collectors.toList());
+                    return FutureUtil.waitForAll(futures);
+                });
     }
 
     private static final Logger log = LoggerFactory.getLogger(ClustersBase.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index ff41edfb48b..c6b1fbd65bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -264,7 +264,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), 412);
         }
 
-        clusters.deleteNamespaceIsolationPolicy("use", "policy1");
+        asyncRequests(ctx -> clusters.deleteNamespaceIsolationPolicy(ctx, "use", "policy1"));
         assertTrue(((Map<String, NamespaceIsolationDataImpl>) asyncRequests(ctx ->
                 clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());
 
@@ -403,8 +403,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
-        verify(clusters, times(23)).validateSuperUserAccessAsync();
-        verify(clusters, times(1)).validateSuperUserAccess();
+        verify(clusters, times(24)).validateSuperUserAccessAsync();
     }
 
     @Test