You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/13 03:11:07 UTC
[pulsar] branch master updated: [improve][broker] Make some methods of `ClusterBase` pure async. (#15437)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aa03ccd2ccd [improve][broker] Make some methods of `ClusterBase` pure async. (#15437)
aa03ccd2ccd is described below
commit aa03ccd2ccd2ac664a463b16eb6d6a402e76b120
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Fri May 13 11:10:57 2022 +0800
[improve][broker] Make some methods of `ClusterBase` pure async. (#15437)
---
.../pulsar/broker/admin/impl/ClustersBase.java | 194 ++++++++++-----------
.../pulsar/broker/namespace/NamespaceService.java | 13 +-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 1 +
.../org/apache/pulsar/broker/admin/AdminTest.java | 14 +-
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 1 +
5 files changed, 109 insertions(+), 114 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 6ffd2ee0058..80309b2a026 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -32,7 +32,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -397,31 +396,49 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
- public Map<String, ? extends NamespaceIsolationData> getNamespaceIsolationPolicies(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
- @PathParam("cluster") String cluster
- ) throws Exception {
- validateSuperUserAccess();
- if (!clusterResources().clusterExists(cluster)) {
- throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
- }
+ public void getNamespaceIsolationPolicies(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, Status.NOT_FOUND))
+ .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
- try {
- NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
- // construct the response to Namespace isolation data map
- return nsIsolationPolicies.getPolicies();
- } catch (Exception e) {
- log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
- throw new RestException(e);
- }
+ /**
+ * Verify that the cluster exists.
+ * For compatibility to avoid breaking changes, we can specify a REST status code when it doesn't exist.
+ * @param cluster Cluster name
+ * @param notExistStatus REST status code
+ */
+ private CompletableFuture<Void> validateClusterExistAsync(String cluster, Status notExistStatus) {
+ return clusterResources().clusterExistsAsync(cluster)
+ .thenAccept(clusterExist -> {
+ if (!clusterExist) {
+ throw new RestException(notExistStatus, "Cluster " + cluster + " does not exist.");
+ }
+ });
+ }
+
+ private CompletableFuture<Map<String, NamespaceIsolationDataImpl>> internalGetNamespaceIsolationPolicies(
+ String cluster) {
+ return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster)
+ .thenApply(namespaceIsolationPolicies -> {
+ if (!namespaceIsolationPolicies.isPresent()) {
+ throw new RestException(Status.NOT_FOUND,
+ "NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
+ }
+ return namespaceIsolationPolicies.get().getPolicies();
+ });
}
+
@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
@@ -435,40 +452,28 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
- public NamespaceIsolationData getNamespaceIsolationPolicy(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
- @PathParam("cluster") String cluster,
- @ApiParam(
- value = "The name of the namespace isolation policy",
- required = true
- )
+ public void getNamespaceIsolationPolicy(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster,
+ @ApiParam(value = "The name of the namespace isolation policy", required = true)
@PathParam("policyName") String policyName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- try {
- NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
- // construct the response to Namespace isolation data map
- if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
- log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
- clientAppId(), policyName, cluster);
- throw new RestException(Status.NOT_FOUND,
- "Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
- }
- return nsIsolationPolicies.getPolicies().get(policyName);
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
- throw new RestException(e);
- }
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
+ .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
+ .thenAccept(policies -> {
+ // construct the response to Namespace isolation data map
+ if (!policies.containsKey(policyName)) {
+ throw new RestException(Status.NOT_FOUND,
+ "Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
+ }
+ asyncResponse.resume(policies.get(policyName));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}",
+ clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -485,53 +490,44 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
- public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ public void getBrokersWithNamespaceIsolationPolicy(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- Set<String> availableBrokers;
- Map<String, ? extends NamespaceIsolationData> nsPolicies;
- try {
- availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
- } catch (Exception e) {
- log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
- throw new RestException(e);
- }
- try {
- Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster);
- if (!nsPoliciesResult.isPresent()) {
- throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
- }
- nsPolicies = nsPoliciesResult.get().getPolicies();
- } catch (Exception e) {
- log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
- throw new RestException(e);
- }
- return availableBrokers.stream().map(broker -> {
- BrokerNamespaceIsolationData.Builder brokerIsolationData = BrokerNamespaceIsolationData.builder()
- .brokerName(broker);
- if (nsPolicies != null) {
- List<String> namespaceRegexes = new ArrayList<>();
- nsPolicies.forEach((name, policyData) -> {
- NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
- if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
- namespaceRegexes.addAll(policyData.getNamespaces());
- if (nsPolicyImpl.isPrimaryBroker(broker)) {
- brokerIsolationData.primary(true);
- }
- }
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
+ .thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
+ .thenCompose(availableBrokers -> internalGetNamespaceIsolationPolicies(cluster)
+ .thenApply(policies -> availableBrokers.stream()
+ .map(broker -> internalGetBrokerNsIsolationData(broker, policies))
+ .collect(Collectors.toList())))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
});
+ }
- brokerIsolationData.namespaceRegex(namespaceRegexes);
- }
+ private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(
+ String broker,
+ Map<String, NamespaceIsolationDataImpl> policies) {
+ BrokerNamespaceIsolationData.Builder brokerIsolationData =
+ BrokerNamespaceIsolationData.builder().brokerName(broker);
+ if (policies == null) {
return brokerIsolationData.build();
- }).collect(Collectors.toList());
+ }
+ List<String> namespaceRegexes = new ArrayList<>();
+ policies.forEach((name, policyData) -> {
+ NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
+ if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
+ namespaceRegexes.addAll(policyData.getNamespaces());
+ brokerIsolationData.primary(nsPolicyImpl.isPrimaryBroker(broker));
+ brokerIsolationData.policyName(name);
+ }
+ });
+ brokerIsolationData.namespaceRegex(namespaceRegexes);
+ return brokerIsolationData.build();
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 558d408f1d3..8d6480db964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -733,8 +733,10 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
- return getLocalNamespaceIsolationPoliciesAsync()
- .thenCompose(namespaceIsolationPolicies -> {
+ return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
+ .getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
+ .thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
+ .thenCompose(namespaceIsolationPolicies -> {
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
@@ -768,13 +770,6 @@ public class NamespaceService implements AutoCloseable {
return nsOwnedStatus;
}
- private CompletableFuture<NamespaceIsolationPolicies> getLocalNamespaceIsolationPoliciesAsync() {
- String localCluster = pulsar.getConfiguration().getClusterName();
- return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
- .getIsolationDataPoliciesAsync(localCluster)
- .thenApply(nsIsolationPolicies -> nsIsolationPolicies.orElseGet(NamespaceIsolationPolicies::new));
- }
-
public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
try {
// Does ZooKeeper says that the namespace is disabled?
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a6b89112634..a1a674eac9c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1029,6 +1029,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
assertEquals(brokerIsolationDataList.get(0).getBrokerName(), brokerAddress);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().size(), 1);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().get(0), namespaceRegex);
+ assertEquals(brokerIsolationDataList.get(0).getPolicyName(), policyName1);
BrokerNamespaceIsolationDataImpl brokerIsolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
.getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 4677ccb521e..bfc29250028 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -202,6 +202,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
}
@Test
+ @SuppressWarnings("unchecked")
public void clusters() throws Exception {
assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
verify(clusters, never()).validateSuperUserAccessAsync();
@@ -239,7 +240,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
try {
- clusters.getNamespaceIsolationPolicies("use");
+ asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
@@ -259,7 +260,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.build();
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
- clusters.getNamespaceIsolationPolicies("use");
+ asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
try {
asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
@@ -269,7 +270,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
}
clusters.deleteNamespaceIsolationPolicy("use", "policy1");
- assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
+ assertTrue(((Map<String, NamespaceIsolationDataImpl>) asynRequests(ctx ->
+ clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());
asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
@@ -289,7 +291,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
}
try {
- clusters.getNamespaceIsolationPolicies("use");
+ asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
@@ -406,8 +408,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
- verify(clusters, times(18)).validateSuperUserAccessAsync();
- verify(clusters, times(6)).validateSuperUserAccess();
+ verify(clusters, times(22)).validateSuperUserAccessAsync();
+ verify(clusters, times(2)).validateSuperUserAccess();
}
Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 6ff29a1bc19..6289dca6e68 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -287,6 +287,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
List<BrokerNamespaceIsolationData> isoList = admin.clusters().getBrokersWithNamespaceIsolationPolicy("use");
assertEquals(isoList.size(), 1);
assertTrue(isoList.get(0).isPrimary());
+ assertEquals(isoList.get(0).getPolicyName(), policyName1);
// verify update of primary
nsPolicyData1.getPrimary().remove(0);