You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/09/11 07:35:00 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make setPersistence method async in Namespaces (#17421)

This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bb290c8d39 [improve][broker][PIP-149]make setPersistence method async in Namespaces (#17421)
7bb290c8d39 is described below

commit 7bb290c8d394811d436a3e97054d81d4c8208af4
Author: HuangZeGui <hz...@126.com>
AuthorDate: Sun Sep 11 15:34:50 2022 +0800

    [improve][broker][PIP-149]make setPersistence method async in Namespaces (#17421)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 26 ++++---------------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 14 +++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 12 +++++++--
 .../apache/pulsar/broker/admin/NamespacesTest.java | 30 +++++++++++++++-------
 4 files changed, 47 insertions(+), 35 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d0f1ef0acc2..b8f3ac8a7a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1509,27 +1509,11 @@ public abstract class NamespacesBase extends AdminResource {
                 .thenCompose(__ -> doUpdatePersistenceAsync(null));
     }
 
-    protected void internalSetPersistence(PersistencePolicies persistence) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        validatePersistencePolicies(persistence);
-
-        doUpdatePersistence(persistence);
-    }
-
-    private void doUpdatePersistence(PersistencePolicies persistence) {
-        try {
-            updatePolicies(namespaceName, policies -> {
-                policies.persistence = persistence;
-                return policies;
-            });
-            log.info("[{}] Successfully updated persistence configuration: namespace={}, map={}", clientAppId(),
-                    namespaceName, jsonMapper().writeValueAsString(persistence));
-        } catch (Exception e) {
-            log.error("[{}] Failed to update persistence configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> internalSetPersistenceAsync(PersistencePolicies persistence) {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> validatePersistencePolicies(persistence))
+                .thenCompose(__ -> doUpdatePersistenceAsync(persistence));
     }
 
     private CompletableFuture<Void> doUpdatePersistenceAsync(PersistencePolicies persistence) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4f603aaeb30..54f4c0c85c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1213,10 +1213,18 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 400, message = "Invalid persistence policies") })
-    public void setPersistence(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, PersistencePolicies persistence) {
+    public void setPersistence(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
+                               @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                               PersistencePolicies persistence) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetPersistence(persistence);
+        internalSetPersistenceAsync(persistence)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the persistence for a namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 445641718da..03f5bbdd115 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1235,11 +1235,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 400, message = "Invalid persistence policies")})
-    public void setPersistence(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void setPersistence(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                               @PathParam("namespace") String namespace,
                                @ApiParam(value = "Persistence policies for the specified namespace", required = true)
                                        PersistencePolicies persistence) {
         validateNamespaceName(tenant, namespace);
-        internalSetPersistence(persistence);
+        internalSetPersistenceAsync(persistence)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the persistence for a namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c23407bb447..73cf4914ffe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.zookeeper.KeeperException.Code;
@@ -196,6 +197,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
                         PolicyName.RETENTION, PolicyOperation.WRITE);
 
+        doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+                .validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+        doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+                .validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.RETENTION, PolicyOperation.WRITE);
+
         nsSvc = pulsar.getNamespaceService();
     }
 
@@ -1065,8 +1074,12 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     public void testPersistence() throws Exception {
         NamespaceName testNs = this.testLocalNamespaces.get(0);
         PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 0.0);
-        namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
         AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.setPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
         namespaces.getPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
         ArgumentCaptor<PersistencePolicies> captor = ArgumentCaptor.forClass(PersistencePolicies.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
@@ -1076,14 +1089,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testPersistenceUnauthorized() throws Exception {
-        try {
-            NamespaceName testNs = this.testLocalNamespaces.get(3);
-            PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 0.0);
-            namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence);
-            fail("Should fail");
-        } catch (RestException e) {
-            assertEquals(e.getResponse().getStatus(), Status.UNAUTHORIZED.getStatusCode());
-        }
+        NamespaceName testNs = this.testLocalNamespaces.get(3);
+        PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 0.0);
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.setPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence);
+        ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+        assertEquals(errorCaptor.getValue().getResponse().getStatus(), Response.Status.UNAUTHORIZED.getStatusCode());
     }
 
     @Test