You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/10 01:28:26 UTC

[pulsar] branch master updated: [improve][broker] make publishRate and dispatchRate operation async (#15946)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4503fe047bb [improve][broker] make publishRate and dispatchRate operation async (#15946)
4503fe047bb is described below

commit 4503fe047bb9c2617f4a656c31537bafd09c2659
Author: zhangqian <50...@qq.com>
AuthorDate: Fri Jun 10 09:28:19 2022 +0800

    [improve][broker] make publishRate and dispatchRate operation async (#15946)
    
    Co-authored-by: ceceezhang <ce...@tencent.com>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 59 +++++++++++++++++++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 64 ++++++++++++++++++----
 .../pulsar/broker/admin/NamespacesV2Test.java      | 40 +++++++++++++-
 3 files changed, 149 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index af6f465e433..ad702e1eb36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1257,6 +1257,16 @@ public abstract class NamespacesBase extends AdminResource {
                 namespaceName);
     }
 
+    protected CompletableFuture<Void> internalSetPublishRateAsync(PublishRate maxPublishMessageRate) {
+        log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
+        return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+            policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);
+            log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}",
+                    clientAppId(), namespaceName);
+            return policies;
+        }));
+    }
+
     protected void internalRemovePublishRate() {
         validateSuperUserAccess();
         log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName);
@@ -1276,6 +1286,18 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<Void> internalRemovePublishRateAsync() {
+        log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName);
+        return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+            if (policies.publishMaxMessageRate != null) {
+                policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName());
+            }
+            log.info("[{}] Successfully remove the publish_max_message_rate for cluster on namespace {}", clientAppId(),
+                    namespaceName);
+            return policies;
+        }));
+    }
+
     protected PublishRate internalGetPublishRate() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
@@ -1283,11 +1305,17 @@ public abstract class NamespacesBase extends AdminResource {
         return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
     }
 
+    protected CompletableFuture<PublishRate> internalGetPublishRateAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies ->
+                        policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName()));
+    }
+
     @SuppressWarnings("deprecation")
     protected void internalSetTopicDispatchRate(DispatchRateImpl dispatchRate) {
         validateSuperUserAccess();
         log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
-
         try {
             updatePolicies(namespaceName, policies -> {
                 policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
@@ -1303,6 +1331,18 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    @SuppressWarnings("deprecation")
+    protected CompletableFuture<Void> internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) {
+        log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
+        return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+            policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+            policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+            log.info("[{}] Successfully updated the dispatchRate for cluster on namespace {}", clientAppId(),
+                    namespaceName);
+            return policies;
+        }));
+    }
+
     protected void internalDeleteTopicDispatchRate() {
         validateSuperUserAccess();
         try {
@@ -1320,6 +1360,16 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<Void> internalDeleteTopicDispatchRateAsync() {
+        return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+            policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName());
+            policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName());
+            log.info("[{}] Successfully delete the dispatchRate for cluster on namespace {}", clientAppId(),
+                    namespaceName);
+            return policies;
+        }));
+    }
+
     @SuppressWarnings("deprecation")
     protected DispatchRate internalGetTopicDispatchRate() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
@@ -1328,6 +1378,13 @@ public abstract class NamespacesBase extends AdminResource {
         return policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
     }
 
+    @SuppressWarnings("deprecation")
+    protected CompletableFuture<DispatchRate> internalGetTopicDispatchRateAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()));
+    }
+
     protected void internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
         validateSuperUserAccess();
         log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index eaa8232032b..00106dd98c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -732,19 +732,33 @@ public class Namespaces extends NamespacesBase {
     @Path("/{property}/{namespace}/publishRate")
     @ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setPublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace,
+    public void setPublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+            @PathParam("namespace") String namespace,
             @ApiParam(value = "Publish rate for all topics of the specified namespace") PublishRate publishRate) {
         validateNamespaceName(property, namespace);
-        internalSetPublishRate(publishRate);
+        internalSetPublishRateAsync(publishRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{property}/{namespace}/publishRate")
     @ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void removePublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace) {
+    public void removePublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(property, namespace);
-        internalRemovePublishRate();
+        internalRemovePublishRateAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to remove the publish_max_message_rate for cluster on namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -754,31 +768,52 @@ public class Namespaces extends NamespacesBase {
                     + "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public PublishRate getPublishRate(
+    public void getPublishRate(@Suspended AsyncResponse asyncResponse,
             @PathParam("property") String property,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(property, namespace);
-        return internalGetPublishRate();
+        internalGetPublishRateAsync()
+                .thenAccept(publishRate -> asyncResponse.resume(publishRate))
+                .exceptionally(ex -> {
+                    log.error("Failed to get publish rate for namespace {}", namespaceName, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/dispatchRate")
     @ApiOperation(value = "Set dispatch-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void setDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
             @ApiParam(value = "Dispatch rate for all topics of the specified namespace")
                     DispatchRateImpl dispatchRate) {
         validateNamespaceName(tenant, namespace);
-        internalSetTopicDispatchRate(dispatchRate);
+        internalSetTopicDispatchRateAsync(dispatchRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the dispatchRate for cluster on namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{tenant}/{namespace}/dispatchRate")
     @ApiOperation(value = "Delete dispatch-rate throttling for all topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void deleteDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void deleteDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalDeleteTopicDispatchRate();
+        internalDeleteTopicDispatchRateAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to delete the dispatchRate for cluster on namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -787,10 +822,15 @@ public class Namespaces extends NamespacesBase {
             + "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
+    public void getDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetTopicDispatchRate();
+        internalGetTopicDispatchRateAsync()
+                .thenAccept(dispatchRate -> asyncResponse.resume(dispatchRate))
+                .exceptionally(ex -> {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index 4c4929390bd..feca36307e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -39,10 +39,13 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -159,4 +162,39 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
         }
     }
 
-}
+    @Test
+    public void testOperationPublishRate() throws Exception {
+        // 1. set publish rate
+        asyncRequests(response -> namespaces.setPublishRate(response, this.testTenant, this.testNamespace,
+                new PublishRate()));
+
+        // 2. get publish rate and check
+        PublishRate publishRate = (PublishRate) asyncRequests(response -> namespaces.getPublishRate(response,
+                this.testTenant, this.testNamespace));
+        assertTrue(Objects.nonNull(publishRate));
+
+        // 3. remove publish rate and check
+        asyncRequests(responses -> namespaces.removePublishRate(responses, this.testTenant, this.testNamespace));
+        publishRate = (PublishRate) asyncRequests(response -> namespaces.getPublishRate(response,
+                this.testTenant, this.testNamespace));
+        assertTrue(Objects.isNull(publishRate));
+    }
+
+    @Test
+    public void testOperationDispatchRate() throws Exception {
+        // 1. set dispatch rate
+        asyncRequests(response -> namespaces.setDispatchRate(response, this.testTenant, this.testNamespace,
+                new DispatchRateImpl()));
+
+        // 2. get dispatch rate and check
+        DispatchRate dispatchRate = (DispatchRateImpl) asyncRequests(response -> namespaces.getDispatchRate(response,
+                this.testTenant, this.testNamespace));
+        assertTrue(Objects.nonNull(dispatchRate));
+
+        // 3. remove dispatch rate and check
+        asyncRequests(responses -> namespaces.deleteDispatchRate(responses, this.testTenant, this.testNamespace));
+        dispatchRate = (DispatchRateImpl) asyncRequests(response -> namespaces.getDispatchRate(response,
+                this.testTenant, this.testNamespace));
+        assertTrue(Objects.isNull(dispatchRate));
+    }
+}
\ No newline at end of file