You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/02 01:18:02 UTC

[pulsar] branch master updated: Make some operation replication clusters methods in NamespacesBase async (#15760)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eb1daaeb05a Make some operation replication clusters methods in NamespacesBase async (#15760)
eb1daaeb05a is described below

commit eb1daaeb05a351c5b832a3f49e1f96e8ba4df3f1
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Jun 2 09:17:54 2022 +0800

    Make some operation replication clusters methods in NamespacesBase async (#15760)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  9 +++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 81 ++++++++++++----------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 35 +++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 28 ++++++--
 .../pulsar/broker/web/PulsarWebResource.java       | 22 ++++++
 .../apache/pulsar/broker/admin/NamespacesTest.java | 78 +++++++++++----------
 6 files changed, 166 insertions(+), 87 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 43addc30af7..3743a6e9ed9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -426,6 +426,15 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<Set<String>> clustersAsync() {
+        return clusterResources().listAsync()
+                .thenApply(list ->
+                        list.stream()
+                                .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster))
+                                .collect(Collectors.toSet())
+                );
+    }
+
     protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b143a03038b..af6f465e433 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -751,45 +751,52 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected Set<String> internalGetNamespaceReplicationClusters() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ);
-
-        if (!namespaceName.isGlobal()) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Cannot get the replication clusters for a non-global namespace");
-        }
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.replication_clusters;
+    protected CompletableFuture<Set<String>> internalGetNamespaceReplicationClustersAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ)
+                .thenAccept(__ -> {
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot get the replication clusters for a non-global namespace");
+                    }
+                }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.replication_clusters);
     }
 
-    protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        checkNotNull(clusterIds, "ClusterIds should not be null");
-
-        Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
-        if (!namespaceName.isGlobal()) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Cannot set replication on a non-global namespace");
-        }
-
-        if (replicationClusterSet.contains("global")) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Cannot specify global in the list of replication clusters");
-        }
-
-        Set<String> clusters = clusters();
-        for (String clusterId : replicationClusterSet) {
-            if (!clusters.contains(clusterId)) {
-                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
-            }
-            validatePeerClusterConflict(clusterId, replicationClusterSet);
-            validateClusterForTenant(namespaceName.getTenant(), clusterId);
-        }
-        updatePolicies(namespaceName, policies ->{
-            policies.replication_clusters = replicationClusterSet;
-            return policies;
-        });
+    @SuppressWarnings("checkstyle:WhitespaceAfter")
+    protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<String> clusterIds) {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenApply(__ -> {
+                    checkNotNull(clusterIds, "ClusterIds should not be null");
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot set replication on a non-global namespace");
+                    }
+                    Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
+                    if (replicationClusterSet.contains("global")) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot specify global in the list of replication clusters");
+                    }
+                    return replicationClusterSet;
+                }).thenCompose(replicationClusterSet -> clustersAsync()
+                        .thenCompose(clusters -> {
+                            List<CompletableFuture<Void>> futures =
+                                    replicationClusterSet.stream().map(clusterId -> {
+                                        if (!clusters.contains(clusterId)) {
+                                            throw new RestException(Status.FORBIDDEN,
+                                                    "Invalid cluster id: " + clusterId);
+                                        }
+                                        return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
+                                                .thenCompose(__ ->
+                                                        validateClusterForTenantAsync(
+                                                                namespaceName.getTenant(), clusterId));
+                                    }).collect(Collectors.toList());
+                            return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
+                        }))
+                .thenCompose(replicationClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
+                    policies.replication_clusters = replicationClusterSet;
+                    return policies;
+                }));
     }
 
     protected CompletableFuture<Void> internalSetNamespaceMessageTTLAsync(Integer messageTTL) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index fc207efd7a7..ba95927dcd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -363,13 +363,21 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not global")})
-    public Set<String> getNamespaceReplicationClusters(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+    public void getNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+                                                @PathParam("property") String property,
+                                                @PathParam("cluster") String cluster,
+                                                @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        validateNamespacePolicyOperation(NamespaceName.get(property, namespace),
-                PolicyName.REPLICATION, PolicyOperation.READ);
-
-        return internalGetNamespaceReplicationClusters();
+        validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace),
+                PolicyName.REPLICATION, PolicyOperation.READ)
+                .thenCompose(__ -> internalGetNamespaceReplicationClustersAsync())
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to get namespace replication clusters on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @POST
@@ -379,10 +387,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Peer-cluster can't be part of replication-cluster"),
             @ApiResponse(code = 412, message = "Namespace is not global or invalid cluster ids") })
-    public void setNamespaceReplicationClusters(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, List<String> clusterIds) {
+    public void setNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+                                                @PathParam("property") String property,
+                                                @PathParam("cluster") String cluster,
+                                                @PathParam("namespace") String namespace, List<String> clusterIds) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetNamespaceReplicationClusters(clusterIds);
+        internalSetNamespaceReplicationClusters(clusterIds)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to set namespace replication clusters on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e246c7d86e6..eaa8232032b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -308,10 +308,18 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not global")})
-    public Set<String> getNamespaceReplicationClusters(@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace) {
+    public void getNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+                                                @PathParam("tenant") String tenant,
+                                                @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetNamespaceReplicationClusters();
+        internalGetNamespaceReplicationClustersAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to get namespace replication clusters on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @POST
@@ -321,11 +329,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Peer-cluster can't be part of replication-cluster"),
             @ApiResponse(code = 412, message = "Namespace is not global or invalid cluster ids") })
-    public void setNamespaceReplicationClusters(@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace,
+    public void setNamespaceReplicationClusters(@Suspended AsyncResponse asyncResponse,
+                                                @PathParam("tenant") String tenant,
+                                                @PathParam("namespace") String namespace,
             @ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
         validateNamespaceName(tenant, namespace);
-        internalSetNamespaceReplicationClusters(clusterIds);
+        internalSetNamespaceReplicationClusters(clusterIds)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to set namespace replication clusters on namespace {}",
+                            clientAppId(), namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e491f78c9e2..22449e7f119 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -395,6 +395,28 @@ public abstract class PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<Void> validatePeerClusterConflictAsync(String clusterName,
+                                                                       Set<String> replicationClusters) {
+        return clusterResources().getClusterAsync(clusterName)
+                .thenAccept(data -> {
+                    ClusterData clusterData = data.orElseThrow(() -> new RestException(
+                            Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName));
+                    Set<String> peerClusters = clusterData.getPeerClusterNames();
+                    if (peerClusters != null && !peerClusters.isEmpty()) {
+                        Sets.SetView<String> conflictPeerClusters =
+                                Sets.intersection(peerClusters, replicationClusters);
+                        if (!conflictPeerClusters.isEmpty()) {
+                            log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(),
+                                    clusterName, conflictPeerClusters);
+                            throw new RestException(Status.CONFLICT,
+                                    String.format("%s's peer-clusters %s can't be part of replication-clusters %s",
+                                            clusterName,
+                                            conflictPeerClusters, replicationClusters));
+                        }
+                    }
+                });
+    }
+
     protected void validateClusterForTenant(String tenant, String cluster) {
         TenantInfo tenantInfo;
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index dba0b6bd6ba..e6b928a57a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -486,32 +486,37 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testGlobalNamespaceReplicationConfiguration() throws Exception {
-        assertEquals(
-                namespaces.getNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                        this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName()),
-                Sets.newHashSet());
-
-        namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName(),
-                Lists.newArrayList("use", "usw"));
-        assertEquals(
-                namespaces.getNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                        this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName()),
-                Lists.newArrayList("use", "usw"));
+
+        Set<String> repCluster = (Set<String>) asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp,
+                this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+                this.testGlobalNamespaces.get(0).getLocalName()));
+        assertEquals(repCluster, Sets.newHashSet());
+
+        asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+                this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+                this.testGlobalNamespaces.get(0).getLocalName(),
+                Lists.newArrayList("use", "usw")));
+
+        repCluster = (Set<String>) asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp,
+                this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+                this.testGlobalNamespaces.get(0).getLocalName()));
+        assertEquals(repCluster, Lists.newArrayList("use", "usw"));
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                    this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName(),
-                    Lists.newArrayList("use", "invalid-cluster"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+                    this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+                    this.testGlobalNamespaces.get(0).getLocalName(),
+                    Lists.newArrayList("use", "invalid-cluster")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.FORBIDDEN.getStatusCode());
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                    this.testGlobalNamespaces.get(0).getCluster(), this.testGlobalNamespaces.get(0).getLocalName(),
-                    Lists.newArrayList("use", "global"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+                    this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+                    this.testGlobalNamespaces.get(0).getLocalName(),
+                    Lists.newArrayList("use", "global")));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, global should not be allowed in the list of replication clusters
@@ -519,8 +524,9 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use", "invalid-cluster"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(),
+                    Lists.newArrayList("use", "invalid-cluster")));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, invalid-cluster is an invalid cluster id
@@ -531,8 +537,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc")));
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use", "usw"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use", "usw")));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, usw was not configured in the list of allowed clusters
@@ -544,8 +550,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         mockZooKeeperGlobal.setAlwaysFail(Code.SESSIONEXPIRED);
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -566,23 +572,24 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         policiesCache.invalidateAll();
         store.invalidateAll();
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 500);
         }
 
         try {
-            namespaces.getNamespaceReplicationClusters(this.testTenant, "global", "non-existing-ns");
+            asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp, this.testTenant,
+                                                                     "global", "non-existing-ns"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, "global", "non-existing-ns",
-                    Lists.newArrayList("use"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant,
+                    "global", "non-existing-ns", Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
@@ -597,24 +604,25 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         store.invalidateAll();
         // ensure the ZooKeeper read happens, bypassing the cache
         try {
-            namespaces.getNamespaceReplicationClusters(this.testTenant, "global",
-                    this.testGlobalNamespaces.get(0).getLocalName());
+            asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp,
+                    this.testGlobalNamespaces.get(0).getTenant(), this.testGlobalNamespaces.get(0).getCluster(),
+                    this.testGlobalNamespaces.get(0).getLocalName()));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 500);
         }
 
         try {
-            namespaces.getNamespaceReplicationClusters(this.testTenant, this.testLocalCluster,
-                    this.testLocalNamespaces.get(0).getLocalName());
+            asyncRequests(rsp -> namespaces.getNamespaceReplicationClusters(rsp, this.testTenant,
+                    this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName()));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, this.testLocalCluster,
-                    this.testLocalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
+            asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, this.testLocalCluster,
+                    this.testLocalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());