You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/27 09:10:30 UTC
[pulsar] branch master updated: [improve][broker] Make some methods of ClusterBase pure async. (#15685)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d9340aca6e [improve][broker] Make some methods of ClusterBase pure async. (#15685)
8d9340aca6e is described below
commit 8d9340aca6e5581183153738338744cb59106e11
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Fri May 27 17:10:20 2022 +0800
[improve][broker] Make some methods of ClusterBase pure async. (#15685)
---
.../pulsar/broker/resources/ClusterResources.java | 12 +
.../pulsar/broker/admin/impl/ClustersBase.java | 274 +++++++++++----------
.../org/apache/pulsar/broker/admin/AdminTest.java | 5 +-
3 files changed, 156 insertions(+), 135 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 7dbf2839b39..91639578d26 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -141,10 +141,17 @@ public class ClusterResources extends BaseResources<ClusterData> {
super(store, clazz, operationTimeoutSec);
}
+ public CompletableFuture<List<String>> listFailureDomainsAsync(String clusterName) {
+ return getChildrenAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN));
+ }
public List<String> listFailureDomains(String clusterName) throws MetadataStoreException {
return getChildren(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN));
}
+ public CompletableFuture<Optional<FailureDomainImpl>> getFailureDomainAsync(String clusterName,
+ String domainName) {
+ return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName));
+ }
public Optional<FailureDomainImpl> getFailureDomain(String clusterName, String domainName)
throws MetadataStoreException {
return get(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName));
@@ -183,6 +190,11 @@ public class ClusterResources extends BaseResources<ClusterData> {
delete(failureDomainPath);
}
+ public CompletableFuture<Void> setFailureDomainWithCreateAsync(String clusterName, String domainName,
+ Function<Optional<FailureDomainImpl>, FailureDomainImpl> createFunction) {
+ return setWithCreateAsync(
+ joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName), createFunction);
+ }
public void setFailureDomainWithCreate(String clusterName, String domainName,
Function<Optional<FailureDomainImpl>, FailureDomainImpl> createFunction)
throws MetadataStoreException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index a1fd8a24733..034d7b5f9fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.admin.impl;
import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
-import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
@@ -31,6 +30,7 @@ import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -41,6 +41,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
@@ -48,9 +49,9 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.Constants;
@@ -686,46 +687,38 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error.")
})
public void deleteNamespaceIsolationPolicy(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The namespace isolation policy name",
- required = true
- )
+ @ApiParam(value = "The namespace isolation policy name", required = true)
@PathParam("policyName") String policyName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
- validatePoliciesReadOnlyAccess();
-
- try {
-
- NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster).orElseGet(() -> {
- try {
- namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
- (p) -> Collections.emptyMap());
- return new NamespaceIsolationPolicies();
- } catch (Exception e) {
- throw new RestException(e);
- }
- });
-
- nsIsolationPolicies.deletePolicy(policyName);
- namespaceIsolationPolicies().setIsolationData(cluster, old -> nsIsolationPolicies.getPolicies());
- } catch (NotFoundException nne) {
- log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
- cluster);
- throw new RestException(Status.NOT_FOUND,
- "NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
- policyName, e);
- throw new RestException(e);
- }
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
+ .thenCompose(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+ .orElseGet(() -> namespaceIsolationPolicies()
+ .setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
+ .thenApply(__ -> new NamespaceIsolationPolicies())))
+ .thenCompose(policies -> {
+ policies.deletePolicy(policyName);
+ return namespaceIsolationPolicies().setIsolationDataAsync(cluster, old -> policies.getPolicies());
+ }).thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotFoundException) {
+ log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist",
+ clientAppId(), cluster);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
+ return null;
+ }
+ log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
+ policyName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -742,38 +735,37 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setFailureDomain(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The failure domain name",
- required = true
- )
+ @ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName,
- @ApiParam(
- value = "The configuration data of a failure domain",
- required = true
- )
- FailureDomainImpl domain
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
- validateBrokerExistsInOtherDomain(cluster, domainName, domain);
-
- try {
- clusterResources().getFailureDomainResources()
- .setFailureDomainWithCreate(cluster, domainName, old -> domain);
- } catch (NotFoundException nne) {
- log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster,
- domainName);
- throw new RestException(Status.NOT_FOUND,
- "Domain " + domainName + " for cluster " + cluster + " does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
- throw new RestException(e);
- }
+ @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomainImpl domain
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+ .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster, domainName, domain))
+ .thenCompose(__ -> clusterResources().getFailureDomainResources()
+ .setFailureDomainWithCreateAsync(cluster, domainName, old -> domain))
+ .thenAccept(__ -> {
+ log.info("[{}] Successful set failure domain {} for cluster {}",
+ clientAppId(), domainName, cluster);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotFoundException) {
+ log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster,
+ domainName);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Domain " + domainName + " for cluster " + cluster + " does not exist"));
+ return null;
+ }
+ log.error("[{}] Failed to update clusters/{}/domainName/{}",
+ clientAppId(), cluster, domainName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -788,34 +780,45 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")
})
- public Map<String, FailureDomainImpl> getFailureDomains(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ public void getFailureDomains(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster
- ) throws Exception {
- validateSuperUserAccess();
-
- Map<String, FailureDomainImpl> domains = Maps.newHashMap();
- try {
- FailureDomainResources fdr = clusterResources().getFailureDomainResources();
- for (String domainName : fdr.listFailureDomains(cluster)) {
- try {
- Optional<FailureDomainImpl> domain = fdr.getFailureDomain(cluster, domainName);
- domain.ifPresent(failureDomain -> domains.put(domainName, failureDomain));
- } catch (Exception e) {
- log.warn("Failed to get domain {}", domainName, e);
- }
- }
- } catch (NotFoundException e) {
- log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e);
- return Collections.emptyMap();
- } catch (Exception e) {
- log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e);
- throw new RestException(e);
- }
- return domains;
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> clusterResources().getFailureDomainResources()
+ .listFailureDomainsAsync(cluster)
+ .thenCompose(domainNames -> {
+ List<CompletableFuture<Pair<String, Optional<FailureDomainImpl>>>> futures =
+ domainNames.stream()
+ .map(domainName -> clusterResources().getFailureDomainResources()
+ .getFailureDomainAsync(cluster, domainName)
+ .thenApply(failureDomainImpl -> Pair.of(domainName, failureDomainImpl))
+ .exceptionally(ex -> {
+ log.warn("Failed to get domain {}", domainName, ex);
+ return null;
+ })).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures)
+ .thenApply(unused -> futures.stream()
+ .map(CompletableFuture::join)
+ .filter(Objects::nonNull)
+ .filter(v -> v.getRight().isPresent())
+ .collect(Collectors.toMap(Pair::getLeft, v -> v.getRight().get())));
+ }).exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotFoundException) {
+ log.warn("[{}] Failure-domain is not configured for cluster {}",
+ clientAppId(), cluster, ex);
+ return Collections.emptyMap();
+ }
+ throw FutureUtil.wrapToCompletionException(ex);
+ })
+ ).thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -897,42 +900,49 @@ public class ClustersBase extends AdminResource {
}
}
- private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName,
- final FailureDomainImpl inputDomain) {
- if (inputDomain != null && inputDomain.brokers != null) {
- try {
- for (String domainName : clusterResources().getFailureDomainResources()
- .listFailureDomains(cluster)) {
- if (inputDomainName.equals(domainName)) {
- continue;
- }
- try {
- Optional<FailureDomainImpl> domain =
- clusterResources().getFailureDomainResources().getFailureDomain(cluster, domainName);
- if (domain.isPresent() && domain.get().brokers != null) {
- List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
- .filter(inputDomain.brokers::contains).collect(Collectors.toList());
- if (!duplicateBrokers.isEmpty()) {
- throw new RestException(Status.CONFLICT,
- duplicateBrokers + " already exists in " + domainName);
- }
- }
- } catch (Exception e) {
- if (e instanceof RestException) {
- throw e;
- }
- log.warn("Failed to get domain {}", domainName, e);
- }
- }
- } catch (NotFoundException e) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Domain is not configured for cluster", clientAppId(), e);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e);
- throw new RestException(e);
- }
+ private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final String cluster,
+ final String inputDomainName,
+ final FailureDomainImpl inputDomain) {
+ if (inputDomain == null || inputDomain.brokers == null) {
+ return CompletableFuture.completedFuture(null);
}
+ return clusterResources().getFailureDomainResources()
+ .listFailureDomainsAsync(cluster)
+ .thenCompose(domainNames -> {
+ List<CompletableFuture<Void>> futures = domainNames.stream()
+ .filter(domainName -> !domainName.equals(inputDomainName))
+ .map(domainName -> clusterResources()
+ .getFailureDomainResources().getFailureDomainAsync(cluster, domainName)
+ .thenAccept(failureDomainOpt -> {
+ if (failureDomainOpt.isPresent()
+ && CollectionUtils.isNotEmpty(failureDomainOpt.get().getBrokers())) {
+ List<String> duplicateBrokers = failureDomainOpt.get()
+ .getBrokers().stream().parallel()
+ .filter(inputDomain.brokers::contains)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(duplicateBrokers)) {
+ throw new RestException(Status.CONFLICT,
+ duplicateBrokers + " already exists in " + domainName);
+ }
+ }
+ }).exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ throw FutureUtil.wrapToCompletionException(ex);
+ }
+ if (realCause instanceof NotFoundException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Domain is not configured for cluster",
+ clientAppId(), ex);
+ }
+ return null;
+ }
+ log.warn("Failed to get domain {}", domainName, ex);
+ return null;
+ })
+ ).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures);
+ });
}
private static final Logger log = LoggerFactory.getLogger(ClustersBase.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index ff41edfb48b..c6b1fbd65bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -264,7 +264,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertEquals(e.getResponse().getStatus(), 412);
}
- clusters.deleteNamespaceIsolationPolicy("use", "policy1");
+ asyncRequests(ctx -> clusters.deleteNamespaceIsolationPolicy(ctx, "use", "policy1"));
assertTrue(((Map<String, NamespaceIsolationDataImpl>) asyncRequests(ctx ->
clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());
@@ -403,8 +403,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
- verify(clusters, times(23)).validateSuperUserAccessAsync();
- verify(clusters, times(1)).validateSuperUserAccess();
+ verify(clusters, times(24)).validateSuperUserAccessAsync();
}
@Test