You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/10 01:28:26 UTC
[pulsar] branch master updated: [improve][broker] make publishRate and dispatchRate operation async (#15946)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4503fe047bb [improve][broker] make publishRate and dispatchRate operation async (#15946)
4503fe047bb is described below
commit 4503fe047bb9c2617f4a656c31537bafd09c2659
Author: zhangqian <50...@qq.com>
AuthorDate: Fri Jun 10 09:28:19 2022 +0800
[improve][broker] make publishRate and dispatchRate operation async (#15946)
Co-authored-by: ceceezhang <ce...@tencent.com>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 59 +++++++++++++++++++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 64 ++++++++++++++++++----
.../pulsar/broker/admin/NamespacesV2Test.java | 40 +++++++++++++-
3 files changed, 149 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index af6f465e433..ad702e1eb36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1257,6 +1257,16 @@ public abstract class NamespacesBase extends AdminResource {
namespaceName);
}
+ protected CompletableFuture<Void> internalSetPublishRateAsync(PublishRate maxPublishMessageRate) {
+ log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
+ return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);
+ log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}",
+ clientAppId(), namespaceName);
+ return policies;
+ }));
+ }
+
protected void internalRemovePublishRate() {
validateSuperUserAccess();
log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName);
@@ -1276,6 +1286,18 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected CompletableFuture<Void> internalRemovePublishRateAsync() {
+ log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName);
+ return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ if (policies.publishMaxMessageRate != null) {
+ policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName());
+ }
+ log.info("[{}] Successfully remove the publish_max_message_rate for cluster on namespace {}", clientAppId(),
+ namespaceName);
+ return policies;
+ }));
+ }
+
protected PublishRate internalGetPublishRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
@@ -1283,11 +1305,17 @@ public abstract class NamespacesBase extends AdminResource {
return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
}
+ protected CompletableFuture<PublishRate> internalGetPublishRateAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies ->
+ policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName()));
+ }
+
@SuppressWarnings("deprecation")
protected void internalSetTopicDispatchRate(DispatchRateImpl dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
-
try {
updatePolicies(namespaceName, policies -> {
policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
@@ -1303,6 +1331,18 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ @SuppressWarnings("deprecation")
+ protected CompletableFuture<Void> internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) {
+ log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
+ return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+ policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+ log.info("[{}] Successfully updated the dispatchRate for cluster on namespace {}", clientAppId(),
+ namespaceName);
+ return policies;
+ }));
+ }
+
protected void internalDeleteTopicDispatchRate() {
validateSuperUserAccess();
try {
@@ -1320,6 +1360,16 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected CompletableFuture<Void> internalDeleteTopicDispatchRateAsync() {
+ return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName());
+ policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName());
+ log.info("[{}] Successfully delete the dispatchRate for cluster on namespace {}", clientAppId(),
+ namespaceName);
+ return policies;
+ }));
+ }
+
@SuppressWarnings("deprecation")
protected DispatchRate internalGetTopicDispatchRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
@@ -1328,6 +1378,13 @@ public abstract class NamespacesBase extends AdminResource {
return policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
}
+ @SuppressWarnings("deprecation")
+ protected CompletableFuture<DispatchRate> internalGetTopicDispatchRateAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()));
+ }
+
protected void internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index eaa8232032b..00106dd98c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -732,19 +732,33 @@ public class Namespaces extends NamespacesBase {
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void setPublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace,
+ public void setPublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Publish rate for all topics of the specified namespace") PublishRate publishRate) {
validateNamespaceName(property, namespace);
- internalSetPublishRate(publishRate);
+ internalSetPublishRateAsync(publishRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void removePublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace) {
+ public void removePublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
- internalRemovePublishRate();
+ internalRemovePublishRateAsync()
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove the publish_max_message_rate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -754,31 +768,52 @@ public class Namespaces extends NamespacesBase {
+ "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public PublishRate getPublishRate(
+ public void getPublishRate(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
- return internalGetPublishRate();
+ internalGetPublishRateAsync()
+ .thenAccept(publishRate -> asyncResponse.resume(publishRate))
+ .exceptionally(ex -> {
+ log.error("Failed to get publish rate for namespace {}", namespaceName, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/dispatchRate")
@ApiOperation(value = "Set dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ public void setDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, namespace);
- internalSetTopicDispatchRate(dispatchRate);
+ internalSetTopicDispatchRateAsync(dispatchRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the dispatchRate for cluster on namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@Path("/{tenant}/{namespace}/dispatchRate")
@ApiOperation(value = "Delete dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void deleteDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+ public void deleteDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalDeleteTopicDispatchRate();
+ internalDeleteTopicDispatchRateAsync()
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to delete the dispatchRate for cluster on namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -787,10 +822,15 @@ public class Namespaces extends NamespacesBase {
+ "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
+ public void getDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetTopicDispatchRate();
+ internalGetTopicDispatchRateAsync()
+ .thenAccept(dispatchRate -> asyncResponse.resume(dispatchRate))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index 4c4929390bd..feca36307e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -39,10 +39,13 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -159,4 +162,39 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
}
}
-}
+ @Test
+ public void testOperationPublishRate() throws Exception {
+ // 1. set publish rate
+ asyncRequests(response -> namespaces.setPublishRate(response, this.testTenant, this.testNamespace,
+ new PublishRate()));
+
+ // 2. get publish rate and check
+ PublishRate publishRate = (PublishRate) asyncRequests(response -> namespaces.getPublishRate(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.nonNull(publishRate));
+
+ // 3. remove publish rate and check
+ asyncRequests(responses -> namespaces.removePublishRate(responses, this.testTenant, this.testNamespace));
+ publishRate = (PublishRate) asyncRequests(response -> namespaces.getPublishRate(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.isNull(publishRate));
+ }
+
+ @Test
+ public void testOperationDispatchRate() throws Exception {
+ // 1. set dispatch rate
+ asyncRequests(response -> namespaces.setDispatchRate(response, this.testTenant, this.testNamespace,
+ new DispatchRateImpl()));
+
+ // 2. get dispatch rate and check
+ DispatchRate dispatchRate = (DispatchRateImpl) asyncRequests(response -> namespaces.getDispatchRate(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.nonNull(dispatchRate));
+
+ // 3. remove dispatch rate and check
+ asyncRequests(responses -> namespaces.deleteDispatchRate(responses, this.testTenant, this.testNamespace));
+ dispatchRate = (DispatchRateImpl) asyncRequests(response -> namespaces.getDispatchRate(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.isNull(dispatchRate));
+ }
+}
\ No newline at end of file