You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/27 14:37:02 UTC

[pulsar] branch master updated: [improve][broker] Make some operation SubscribeRate methods in Namespaces async (#15656)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d09c6eb26ad [improve][broker] Make some operation SubscribeRate methods in Namespaces async (#15656)
d09c6eb26ad is described below

commit d09c6eb26adcaf0aff2c3464e445eb8df5af70a6
Author: yanliang <ya...@163.com>
AuthorDate: Fri May 27 22:36:51 2022 +0800

    [improve][broker] Make some operation SubscribeRate methods in Namespaces async (#15656)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  43 ++----
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  45 ++++--
 .../pulsar/broker/admin/NamespacesV2Test.java      | 162 +++++++++++++++++++++
 3 files changed, 210 insertions(+), 40 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index dd882dfdadf..b143a03038b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1363,43 +1363,30 @@ public abstract class NamespacesBase extends AdminResource {
         return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
     }
 
-    protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
-        validateSuperUserAccess();
+    protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
         log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
-        try {
-            updatePolicies(namespaceName, policies -> {
-                policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
-                return policies;
-            });
+        return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+            policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
             log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(),
                     namespaceName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+            return policies;
+        }));
     }
 
-    protected void internalDeleteSubscribeRate() {
-        validateSuperUserAccess();
-        try {
-            updatePolicies(namespaceName, policies -> {
-                policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
-                return policies;
-            });
+    protected CompletableFuture<Void> internalDeleteSubscribeRateAsync() {
+        return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+            policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
             log.info("[{}] Successfully delete the subscribeRate for cluster on namespace {}", clientAppId(),
                     namespaceName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+            return policies;
+        }));
     }
 
-    protected SubscribeRate internalGetSubscribeRate() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
+
+    protected CompletableFuture<SubscribeRate> internalGetSubscribeRateAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()));
     }
 
     protected void internalRemoveReplicatorDispatchRate() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 52e47b534dc..e246c7d86e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -815,31 +815,52 @@ public class Namespaces extends NamespacesBase {
     @DELETE
     @Path("/{tenant}/{namespace}/subscribeRate")
     @ApiOperation(value = "Delete subscribe-rate throttling for all topics of the namespace")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void deleteSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+    public void deleteSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalDeleteSubscribeRate();
+        internalDeleteSubscribeRateAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to delete the subscribeRate for cluster on namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/subscribeRate")
     @ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @ApiParam(value = "Subscribe rate for all topics of the specified namespace") SubscribeRate subscribeRate) {
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+    public void setSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace,
+                                 @ApiParam(value = "Subscribe rate for all topics of the specified namespace")
+                                         SubscribeRate subscribeRate) {
         validateNamespaceName(tenant, namespace);
-        internalSetSubscribeRate(subscribeRate);
+        internalSetSubscribeRateAsync(subscribeRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to update the subscribeRate for cluster on namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
     @Path("/{tenant}/{namespace}/subscribeRate")
     @ApiOperation(value = "Get subscribe-rate configured for the namespace")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
-                                        @PathParam("namespace") String namespace) {
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetSubscribeRate();
+        internalGetSubscribeRateAsync()
+                .thenAccept(subscribeRate -> asyncResponse.resume(subscribeRate))
+                .exceptionally(ex -> {
+                    log.error("Failed to get subscribe rate for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
new file mode 100644
index 00000000000..4c4929390bd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Objects;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import org.apache.pulsar.broker.admin.v2.Namespaces;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.web.PulsarWebResource;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin-v2")
+public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
+    private static final Logger log = LoggerFactory.getLogger(NamespacesV2Test.class);
+
+    private Namespaces namespaces;
+
+    private List<NamespaceName> testLocalNamespaces;
+    private final String testNamespace = "v2-test-namespace";
+    private final String testTenant = "v2-tenant";
+    private final String testLocalCluster = "use";
+
+    protected NamespaceService nsSvc;
+    protected Field uriField;
+    protected UriInfo uriInfo;
+
+    public NamespacesV2Test() {
+        super();
+    }
+
+    @BeforeClass
+    public void initNamespace() throws Exception {
+        testLocalNamespaces = Lists.newArrayList();
+        testLocalNamespaces.add(NamespaceName.get(this.testTenant, this.testLocalCluster, this.testNamespace));
+
+        uriField = PulsarWebResource.class.getDeclaredField("uri");
+        uriField.setAccessible(true);
+        uriInfo = mock(UriInfo.class);
+    }
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        resetConfig();
+        conf.setClusterName(testLocalCluster);
+        super.internalSetup();
+
+        namespaces = spy(Namespaces.class);
+        namespaces.setServletContext(new MockServletContext());
+        namespaces.setPulsar(pulsar);
+        doReturn(false).when(namespaces).isRequestHttps();
+        doReturn("test").when(namespaces).clientAppId();
+        doReturn(null).when(namespaces).originalPrincipal();
+        doReturn(null).when(namespaces).clientAuthData();
+        doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
+
+        admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
+        admin.clusters().createCluster("usw", ClusterData.builder().serviceUrl("http://broker-usw.com:8080").build());
+        admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl("http://broker-usc.com:8080").build());
+        admin.tenants().createTenant(this.testTenant,
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc", "usw")));
+
+        createTestNamespaces(this.testLocalNamespaces);
+
+        doThrow(new RestException(Response.Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+                .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+        doThrow(new RestException(Response.Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+                .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.RETENTION, PolicyOperation.WRITE);
+
+        nsSvc = pulsar.getNamespaceService();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+        conf.setClusterName(testLocalCluster);
+    }
+
+    private void createTestNamespaces(List<NamespaceName> nsnames) throws Exception {
+        for (NamespaceName nsName : nsnames) {
+            asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(),
+                    nsName.getLocalName(), null));
+        }
+    }
+
+    @Test
+    public void testOperationSubscribeRate() throws Exception {
+        // 1. set subscribe rate
+        asyncRequests(response -> namespaces.setSubscribeRate(response, this.testTenant,
+                this.testNamespace, new SubscribeRate()));
+
+        // 2. query subscribe rate & check
+        SubscribeRate subscribeRate =
+                (SubscribeRate) asyncRequests(response -> namespaces.getSubscribeRate(response,
+                        this.testTenant, this.testNamespace));
+        assertTrue(Objects.nonNull(subscribeRate));
+        assertTrue(Objects.isNull(SubscribeRate.normalize(subscribeRate)));
+
+        // 3. remove & check
+        asyncRequests(response -> namespaces.deleteSubscribeRate(response, this.testTenant, this.testNamespace));
+        subscribeRate =
+                (SubscribeRate) asyncRequests(response -> namespaces.getSubscribeRate(response,
+                        this.testTenant, this.testNamespace));
+        assertTrue(Objects.isNull(subscribeRate));
+
+        // 4. invalid namespace check
+        String invalidNamespace = this.testNamespace + "/";
+        try {
+            asyncRequests(response -> namespaces.setSubscribeRate(response, this.testTenant, invalidNamespace,
+                    new SubscribeRate()));
+            fail("should have failed");
+        } catch (RestException e) {
+            assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
+        }
+    }
+
+}