You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/09/11 07:35:00 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make setPersistence method async in Namespaces (#17421)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7bb290c8d39 [improve][broker][PIP-149]make setPersistence method async in Namespaces (#17421)
7bb290c8d39 is described below
commit 7bb290c8d394811d436a3e97054d81d4c8208af4
Author: HuangZeGui <hz...@126.com>
AuthorDate: Sun Sep 11 15:34:50 2022 +0800
[improve][broker][PIP-149]make setPersistence method async in Namespaces (#17421)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 26 ++++---------------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 14 +++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 12 +++++++--
.../apache/pulsar/broker/admin/NamespacesTest.java | 30 +++++++++++++++-------
4 files changed, 47 insertions(+), 35 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d0f1ef0acc2..b8f3ac8a7a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1509,27 +1509,11 @@ public abstract class NamespacesBase extends AdminResource {
.thenCompose(__ -> doUpdatePersistenceAsync(null));
}
- protected void internalSetPersistence(PersistencePolicies persistence) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- validatePersistencePolicies(persistence);
-
- doUpdatePersistence(persistence);
- }
-
- private void doUpdatePersistence(PersistencePolicies persistence) {
- try {
- updatePolicies(namespaceName, policies -> {
- policies.persistence = persistence;
- return policies;
- });
- log.info("[{}] Successfully updated persistence configuration: namespace={}, map={}", clientAppId(),
- namespaceName, jsonMapper().writeValueAsString(persistence));
- } catch (Exception e) {
- log.error("[{}] Failed to update persistence configuration for namespace {}", clientAppId(), namespaceName,
- e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void> internalSetPersistenceAsync(PersistencePolicies persistence) {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> validatePersistencePolicies(persistence))
+ .thenCompose(__ -> doUpdatePersistenceAsync(persistence));
}
private CompletableFuture<Void> doUpdatePersistenceAsync(PersistencePolicies persistence) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4f603aaeb30..54f4c0c85c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1213,10 +1213,18 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence policies") })
- public void setPersistence(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, PersistencePolicies persistence) {
+ public void setPersistence(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+ PersistencePolicies persistence) {
validateNamespaceName(property, cluster, namespace);
- internalSetPersistence(persistence);
+ internalSetPersistenceAsync(persistence)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the persistence for a namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 445641718da..03f5bbdd115 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1235,11 +1235,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence policies")})
- public void setPersistence(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ public void setPersistence(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Persistence policies for the specified namespace", required = true)
PersistencePolicies persistence) {
validateNamespaceName(tenant, namespace);
- internalSetPersistence(persistence);
+ internalSetPersistenceAsync(persistence)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the persistence for a namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c23407bb447..73cf4914ffe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.zookeeper.KeeperException.Code;
@@ -196,6 +197,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
PolicyName.RETENTION, PolicyOperation.WRITE);
+ doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+ .validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+ doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+ .validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.RETENTION, PolicyOperation.WRITE);
+
nsSvc = pulsar.getNamespaceService();
}
@@ -1065,8 +1074,12 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
public void testPersistence() throws Exception {
NamespaceName testNs = this.testLocalNamespaces.get(0);
PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 0.0);
- namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.setPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+ response = mock(AsyncResponse.class);
namespaces.getPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
ArgumentCaptor<PersistencePolicies> captor = ArgumentCaptor.forClass(PersistencePolicies.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
@@ -1076,14 +1089,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testPersistenceUnauthorized() throws Exception {
- try {
- NamespaceName testNs = this.testLocalNamespaces.get(3);
- PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 0.0);
- namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence);
- fail("Should fail");
- } catch (RestException e) {
- assertEquals(e.getResponse().getStatus(), Status.UNAUTHORIZED.getStatusCode());
- }
+ NamespaceName testNs = this.testLocalNamespaces.get(3);
+ PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 0.0);
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.setPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence);
+ ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
+ verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+ assertEquals(errorCaptor.getValue().getResponse().getStatus(), Response.Status.UNAUTHORIZED.getStatusCode());
}
@Test