You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/17 00:53:52 UTC

[pulsar] branch master updated: [improve][broker] Make some operation subscription dispatchRate methods in Namespaces async (#15880)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f98a43f99 [improve][broker] Make some operation subscription dispatchRate methods in Namespaces async (#15880)
00f98a43f99 is described below

commit 00f98a43f996c40642369fb065a2735e54b74cbf
Author: yanliang <ya...@moego.pet>
AuthorDate: Fri Jun 17 08:53:43 2022 +0800

    [improve][broker] Make some operation subscription dispatchRate methods in Namespaces async (#15880)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 57 ++++++++--------------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 40 ++++++++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 49 ++++++++++++++-----
 .../apache/pulsar/broker/admin/NamespacesTest.java | 39 +++++++++++++++
 .../pulsar/broker/admin/NamespacesV2Test.java      |  2 +-
 5 files changed, 131 insertions(+), 56 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ad702e1eb36..a11165e2170 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1385,46 +1385,31 @@ public abstract class NamespacesBase extends AdminResource {
                 .thenApply(policies -> policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()));
     }
 
-    protected void internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
-        validateSuperUserAccess();
-        log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
-
-        try {
-            updatePolicies(namespaceName, (policies) -> {
-                policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
-                return policies;
-            });
-            log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}",
-                    clientAppId(), namespaceName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) {
+        return validateSuperUserAccessAsync()
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+                    policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+                    log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName);
+                    return policies;
+                }));
     }
 
-    protected void internalDeleteSubscriptionDispatchRate() {
-        validateSuperUserAccess();
-
-        try {
-            updatePolicies(namespaceName, policies -> {
-                policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
-                return policies;
-            });
-            log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}",
-                    clientAppId(), namespaceName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> internalDeleteSubscriptionDispatchRateAsync() {
+        return validateSuperUserAccessAsync()
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+                    policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
+                    log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName);
+                    return policies;
+                }));
     }
 
-    protected DispatchRate internalGetSubscriptionDispatchRate() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
+    protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRateAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies ->
+                        policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()));
     }
 
     protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index ba95927dcd5..beca9529ae0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -849,12 +849,20 @@ public class Namespaces extends NamespacesBase {
     @Path("/{property}/{cluster}/{namespace}/subscriptionDispatchRate")
     @ApiOperation(value = "Set Subscription dispatch-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setSubscriptionDispatchRate(@PathParam("property") String property,
+    public void setSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                            @PathParam("property") String property,
                                             @PathParam("cluster") String cluster,
                                             @PathParam("namespace") String namespace,
                                             DispatchRateImpl dispatchRate) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetSubscriptionDispatchRate(dispatchRate);
+        internalSetSubscriptionDispatchRateAsync(dispatchRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to set the subscription dispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -864,11 +872,19 @@ public class Namespaces extends NamespacesBase {
             + "in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String property,
-                                                    @PathParam("cluster") String cluster,
-                                                    @PathParam("namespace") String namespace) {
+    public void getSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                            @PathParam("property") String property,
+                                            @PathParam("cluster") String cluster,
+                                            @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetSubscriptionDispatchRate();
+        internalGetSubscriptionDispatchRateAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get the subscription dispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -877,11 +893,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
-    public void deleteSubscriptionDispatchRate(@PathParam("property") String property,
+    public void deleteSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                               @PathParam("property") String property,
                                                @PathParam("cluster") String cluster,
                                                @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        internalDeleteSubscriptionDispatchRate();
+        internalDeleteSubscriptionDispatchRateAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to delete the subscription dispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5e831eeaacd..5191a1170ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -838,12 +838,21 @@ public class Namespaces extends NamespacesBase {
     @Path("/{tenant}/{namespace}/subscriptionDispatchRate")
     @ApiOperation(value = "Set Subscription dispatch-rate throttling for all topics of the namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
-    public void setSubscriptionDispatchRate(@PathParam("tenant") String tenant,
-                                            @PathParam("namespace") String namespace, @ApiParam(value =
-            "Subscription dispatch rate for all topics of the specified namespace")
+    public void setSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                            @PathParam("tenant") String tenant,
+                                            @PathParam("namespace") String namespace,
+                                            @ApiParam(value =
+                                            "Subscription dispatch rate for all topics of the specified namespace")
                                                         DispatchRateImpl dispatchRate) {
         validateNamespaceName(tenant, namespace);
-        internalSetSubscriptionDispatchRate(dispatchRate);
+        internalSetSubscriptionDispatchRateAsync(dispatchRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the subscription dispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -853,20 +862,36 @@ public class Namespaces extends NamespacesBase {
             + "in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tenant,
+    public void getSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
                                                     @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetSubscriptionDispatchRate();
+        internalGetSubscriptionDispatchRateAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get the subscription dispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{tenant}/{namespace}/subscriptionDispatchRate")
     @ApiOperation(value = "Delete Subscription dispatch-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant,
+    public void deleteSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                               @PathParam("tenant") String tenant,
                                                @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalDeleteSubscriptionDispatchRate();
+        internalDeleteSubscriptionDispatchRateAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to delete the subscription dispatchRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -879,7 +904,8 @@ public class Namespaces extends NamespacesBase {
         internalDeleteSubscribeRateAsync()
                 .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
-                    log.error("Failed to delete the subscribeRate for cluster on namespace {}", namespaceName, ex);
+                    log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
@@ -897,7 +923,8 @@ public class Namespaces extends NamespacesBase {
         internalSetSubscribeRateAsync(subscribeRate)
                 .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
-                    log.error("Failed to update the subscribeRate for cluster on namespace {}", namespaceName, ex);
+                    log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
@@ -914,7 +941,7 @@ public class Namespaces extends NamespacesBase {
         internalGetSubscribeRateAsync()
                 .thenAccept(subscribeRate -> asyncResponse.resume(subscribeRate))
                 .exceptionally(ex -> {
-                    log.error("Failed to get subscribe rate for namespace {}", namespaceName, ex);
+                    log.error("[{}] Failed to get subscribe rate for namespace {}", clientAppId(), namespaceName, ex);
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index e6b928a57a7..4857f85a905 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -99,6 +100,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.zookeeper.KeeperException.Code;
@@ -1838,4 +1840,41 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         BundlesData bundles = admin.namespaces().getBundles(namespace);
         assertEquals(bundles.getNumBundles(), 14);
     }
+
+    @Test
+    public void testOperationSubscriptionDispatchRate() throws Exception {
+        String namespace = "sub-dispatchrate-namespace";
+
+        // 0. create subscription dispatch rate test namespace
+        asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster,
+                namespace, BundlesData.builder().build()));
+
+        // 1. set subscription dispatch
+        asyncRequests(response -> namespaces.setSubscriptionDispatchRate(response, this.testTenant, this.testLocalCluster,
+                namespace, DispatchRateImpl.builder().build()));
+
+        // 2. check subscription dispatch
+        DispatchRate dispatchRate = (DispatchRate) asyncRequests(
+                response -> namespaces.getSubscriptionDispatchRate(response,
+                        this.testTenant, this.testLocalCluster, namespace));
+        assertNotNull(dispatchRate);
+        assertEquals(-1, dispatchRate.getDispatchThrottlingRateInMsg());
+
+        // 3. delete & check subscription dispatch
+        asyncRequests(response -> namespaces.deleteSubscriptionDispatchRate(response,
+                this.testTenant, this.testLocalCluster, namespace));
+        assertNull(asyncRequests(response -> namespaces.getSubscriptionDispatchRate(response,
+                this.testTenant, this.testLocalCluster,
+                namespace)));
+
+        // 4. exception check
+        try {
+            asyncRequests(response -> namespaces.setSubscriptionDispatchRate(response,
+                    this.testTenant, this.testLocalCluster, "testNamespace", null));
+            fail("should have failed");
+        } catch (RestException e) {
+            assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index feca36307e6..aa0f6793fcd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -197,4 +197,4 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
                 this.testTenant, this.testNamespace));
         assertTrue(Objects.isNull(dispatchRate));
     }
-}
\ No newline at end of file
+}