You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/02 01:18:02 UTC
[pulsar] branch master updated: Make some operation replication clusters methods in NamespacesBase async (#15760)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eb1daaeb05a Make some operation replication clusters methods in NamespacesBase async (#15760)
eb1daaeb05a is described below
commit eb1daaeb05a351c5b832a3f49e1f96e8ba4df3f1
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Jun 2 09:17:54 2022 +0800
Make some operation replication clusters methods in NamespacesBase async (#15760)
---
.../apache/pulsar/broker/admin/AdminResource.java | 9 +++
.../pulsar/broker/admin/impl/NamespacesBase.java | 81 ++++++++++++----------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 35 +++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 28 ++++++--
.../pulsar/broker/web/PulsarWebResource.java | 22 ++++++
.../apache/pulsar/broker/admin/NamespacesTest.java | 78 +++++++++++----------
6 files changed, 166 insertions(+), 87 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 43addc30af7..3743a6e9ed9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -426,6 +426,15 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
+ protected CompletableFuture<Set<String>> clustersAsync() {
+ return clusterResources().listAsync()
+ .thenApply(list ->
+ list.stream()
+ .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster))
+ .collect(Collectors.toSet())
+ );
+ }
+
protected void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b143a03038b..af6f465e433 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -751,45 +751,52 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected Set<String> internalGetNamespaceReplicationClusters() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ);
-
- if (!namespaceName.isGlobal()) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "Cannot get the replication clusters for a non-global namespace");
- }
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.replication_clusters;
+ protected CompletableFuture<Set<String>> internalGetNamespaceReplicationClustersAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ)
+ .thenAccept(__ -> {
+ if (!namespaceName.isGlobal()) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot get the replication clusters for a non-global namespace");
+ }
+ }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.replication_clusters);
}
- protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- checkNotNull(clusterIds, "ClusterIds should not be null");
-
- Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
- if (!namespaceName.isGlobal()) {
- throw new RestException(Status.PRECONDITION_FAILED, "Cannot set replication on a non-global namespace");
- }
-
- if (replicationClusterSet.contains("global")) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "Cannot specify global in the list of replication clusters");
- }
-
- Set<String> clusters = clusters();
- for (String clusterId : replicationClusterSet) {
- if (!clusters.contains(clusterId)) {
- throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
- }
- validatePeerClusterConflict(clusterId, replicationClusterSet);
- validateClusterForTenant(namespaceName.getTenant(), clusterId);
- }
- updatePolicies(namespaceName, policies ->{
- policies.replication_clusters = replicationClusterSet;
- return policies;
- });
+ @SuppressWarnings("checkstyle:WhitespaceAfter")
+ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<String> clusterIds) {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenApply(__ -> {
+ checkNotNull(clusterIds, "ClusterIds should not be null");
+ if (!namespaceName.isGlobal()) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot set replication on a non-global namespace");
+ }
+ Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
+ if (replicationClusterSet.contains("global")) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot specify global in the list of replication clusters");
+ }
+ return replicationClusterSet;
+ }).thenCompose(replicationClusterSet -> clustersAsync()
+ .thenCompose(clusters -> {
+ List<CompletableFuture<Void>> futures =
+ replicationClusterSet.stream().map(clusterId -> {
+ if (!clusters.contains(clusterId)) {
+ throw new RestException(Status.FORBIDDEN,
+ "Invalid cluster id: " + clusterId);
+ }
+ return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
+ .thenCompose(__ ->
+ validateClusterForTenantAsync(
+ namespaceName.getTenant(), clusterId));
+ }).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
+ }))
+ .thenCompose(replicationClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.replication_clusters = replicationClusterSet;
+ return policies;
+ }));
}
protected CompletableFuture<Void> internalSetNamespaceMessageTTLAsync(Integer messageTTL) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index fc207efd7a7..ba95927dcd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -363,13 +363,21 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
- public Set<String> getNamespaceReplicationClusters(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+ public void getNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- validateNamespacePolicyOperation(NamespaceName.get(property, namespace),
- PolicyName.REPLICATION, PolicyOperation.READ);
-
- return internalGetNamespaceReplicationClusters();
+ validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace),
+ PolicyName.REPLICATION, PolicyOperation.READ)
+ .thenCompose(__ -> internalGetNamespaceReplicationClustersAsync())
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ log.error("[{}] Failed to get namespace replication clusters on namespace {}", clientAppId(),
+ namespace, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
}
@POST
@@ -379,10 +387,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Peer-cluster can't be part of replication-cluster"),
@ApiResponse(code = 412, message = "Namespace is not global or invalid cluster ids") })
- public void setNamespaceReplicationClusters(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, List<String> clusterIds) {
+ public void setNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, List<String> clusterIds) {
validateNamespaceName(property, cluster, namespace);
- internalSetNamespaceReplicationClusters(clusterIds);
+ internalSetNamespaceReplicationClusters(clusterIds)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ log.error("[{}] Failed to set namespace replication clusters on namespace {}", clientAppId(),
+ namespace, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e246c7d86e6..eaa8232032b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -308,10 +308,18 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
- public Set<String> getNamespaceReplicationClusters(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetNamespaceReplicationClusters();
+ internalGetNamespaceReplicationClustersAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ log.error("[{}] Failed to get namespace replication clusters on namespace {}", clientAppId(),
+ namespace, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
}
@POST
@@ -321,11 +329,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Peer-cluster can't be part of replication-cluster"),
@ApiResponse(code = 412, message = "Namespace is not global or invalid cluster ids") })
- public void setNamespaceReplicationClusters(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
+ public void setNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
validateNamespaceName(tenant, namespace);
- internalSetNamespaceReplicationClusters(clusterIds);
+ internalSetNamespaceReplicationClusters(clusterIds)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ log.error("[{}] Failed to set namespace replication clusters on namespace {}",
+ clientAppId(), namespace, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e491f78c9e2..22449e7f119 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -395,6 +395,28 @@ public abstract class PulsarWebResource {
}
}
+ protected CompletableFuture<Void> validatePeerClusterConflictAsync(String clusterName,
+ Set<String> replicationClusters) {
+ return clusterResources().getClusterAsync(clusterName)
+ .thenAccept(data -> {
+ ClusterData clusterData = data.orElseThrow(() -> new RestException(
+ Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName));
+ Set<String> peerClusters = clusterData.getPeerClusterNames();
+ if (peerClusters != null && !peerClusters.isEmpty()) {
+ Sets.SetView<String> conflictPeerClusters =
+ Sets.intersection(peerClusters, replicationClusters);
+ if (!conflictPeerClusters.isEmpty()) {
+ log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(),
+ clusterName, conflictPeerClusters);
+ throw new RestException(Status.CONFLICT,
+ String.format("%s's peer-clusters %s can't be part of replication-clusters %s",
+ clusterName,
+ conflictPeerClusters, replicationClusters));
+ }
+ }
+ });
+ }
+
protected void validateClusterForTenant(String tenant, String cluster) {
TenantInfo tenantInfo;
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index dba0b6bd6ba..e6b928a57a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -486,32 +486,37 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testGlobalNamespaceReplicationConfiguration() throws Exception {
- assertEquals(
- namespaces.getNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
- this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName()),
- Sets.newHashSet());
-
- namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
- this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName(),
- Lists.newArrayList("use", "usw"));
- assertEquals(
- namespaces.getNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
- this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName()),
- Lists.newArrayList("use", "usw"));
+
+ Set<String> repCluster = (Set<String>) asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp,
+ this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+ this.testGlobalNamespaces.get(0).getLocalName()));
+ assertEquals(repCluster, Sets.newHashSet());
+
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+ this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+ this.testGlobalNamespaces.get(0).getLocalName(),
+ Lists.newArrayList("use", "usw")));
+
+ repCluster = (Set<String>) asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp,
+ this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+ this.testGlobalNamespaces.get(0).getLocalName()));
+ assertEquals(repCluster, Lists.newArrayList("use", "usw"));
try {
- namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
- this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName(),
- Lists.newArrayList("use", "invalid-cluster"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+ this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+ this.testGlobalNamespaces.get(0).getLocalName(),
+ Lists.newArrayList("use", "invalid-cluster")));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.FORBIDDEN.getStatusCode());
}
try {
- namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
- this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName(),
- Lists.newArrayList("use", "global"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+ this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+ this.testGlobalNamespaces.get(0).getLocalName(),
+ Lists.newArrayList("use", "global")));
fail("should have failed");
} catch (RestException e) {
// Ok, global should not be allowed in the list of replication clusters
@@ -519,8 +524,9 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
try {
- namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
- this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use", "invalid-cluster"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+ this.testGlobalNamespaces.get(0).getLocalName(),
+ Lists.newArrayList("use", "invalid-cluster")));
fail("should have failed");
} catch (RestException e) {
// Ok, invalid-cluster is an invalid cluster id
@@ -531,8 +537,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc")));
try {
- namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
- this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use", "usw"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+ this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use", "usw")));
fail("should have failed");
} catch (RestException e) {
// Ok, usw was not configured in the list of allowed clusters
@@ -544,8 +550,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
mockZooKeeperGlobal.setAlwaysFail(Code.SESSIONEXPIRED);
try {
- namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
- this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+ this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -566,23 +572,24 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
policiesCache.invalidateAll();
store.invalidateAll();
try {
- namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
- this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+ this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 500);
}
try {
- namespaces.getNamespaceReplicationClusters(this.testTenant, "global", "non-existing-ns");
+ asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp, this.testTenant,
+ "global", "non-existing-ns"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
try {
- namespaces.setNamespaceReplicationClusters(this.testTenant, "global", "non-existing-ns",
- Lists.newArrayList("use"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant,
+ "global", "non-existing-ns", Lists.newArrayList("use")));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
@@ -597,24 +604,25 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
store.invalidateAll();
// ensure the ZooKeeper read happens, bypassing the cache
try {
- namespaces.getNamespaceReplicationClusters(this.testTenant, "global",
- this.testGlobalNamespaces.get(0).getLocalName());
+ asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp,
+ this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+ this.testGlobalNamespaces.get(0).getLocalName()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 500);
}
try {
- namespaces.getNamespaceReplicationClusters(this.testTenant, this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName());
+ asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp, this.testTenant,
+ this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
try {
- namespaces.setNamespaceReplicationClusters(this.testTenant, this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
+ asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());