You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/17 00:53:52 UTC
[pulsar] branch master updated: [improve][broker] Make some operation subscription dispatchRate methods in Namespaces async (#15880)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 00f98a43f99 [improve][broker] Make some operation subscription dispatchRate methods in Namespaces async (#15880)
00f98a43f99 is described below
commit 00f98a43f996c40642369fb065a2735e54b74cbf
Author: yanliang <ya...@moego.pet>
AuthorDate: Fri Jun 17 08:53:43 2022 +0800
[improve][broker] Make some operation subscription dispatchRate methods in Namespaces async (#15880)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 57 ++++++++--------------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 40 ++++++++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 49 ++++++++++++++-----
.../apache/pulsar/broker/admin/NamespacesTest.java | 39 +++++++++++++++
.../pulsar/broker/admin/NamespacesV2Test.java | 2 +-
5 files changed, 131 insertions(+), 56 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ad702e1eb36..a11165e2170 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1385,46 +1385,31 @@ public abstract class NamespacesBase extends AdminResource {
.thenApply(policies -> policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()));
}
- protected void internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
- validateSuperUserAccess();
- log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
-
- try {
- updatePolicies(namespaceName, (policies) -> {
- policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
- return policies;
- });
- log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}",
- clientAppId(), namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void> internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) {
+ return validateSuperUserAccessAsync()
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+ log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName);
+ return policies;
+ }));
}
- protected void internalDeleteSubscriptionDispatchRate() {
- validateSuperUserAccess();
-
- try {
- updatePolicies(namespaceName, policies -> {
- policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
- return policies;
- });
- log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}",
- clientAppId(), namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to delete the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void> internalDeleteSubscriptionDispatchRateAsync() {
+ return validateSuperUserAccessAsync()
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
+ log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName);
+ return policies;
+ }));
}
- protected DispatchRate internalGetSubscriptionDispatchRate() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
+ protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRateAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies ->
+ policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()));
}
protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index ba95927dcd5..beca9529ae0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -849,12 +849,20 @@ public class Namespaces extends NamespacesBase {
@Path("/{property}/{cluster}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Set Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void setSubscriptionDispatchRate(@PathParam("property") String property,
+ public void setSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
DispatchRateImpl dispatchRate) {
validateNamespaceName(property, cluster, namespace);
- internalSetSubscriptionDispatchRate(dispatchRate);
+ internalSetSubscriptionDispatchRateAsync(dispatchRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to set the subscription dispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -864,11 +872,19 @@ public class Namespaces extends NamespacesBase {
+ "in dispatch-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String property,
- @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetSubscriptionDispatchRate();
+ internalGetSubscriptionDispatchRateAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get the subscription dispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -877,11 +893,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
- public void deleteSubscriptionDispatchRate(@PathParam("property") String property,
+ public void deleteSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- internalDeleteSubscriptionDispatchRate();
+ internalDeleteSubscriptionDispatchRateAsync()
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to delete the subscription dispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5e831eeaacd..5191a1170ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -838,12 +838,21 @@ public class Namespaces extends NamespacesBase {
@Path("/{tenant}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Set Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
- public void setSubscriptionDispatchRate(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace, @ApiParam(value =
- "Subscription dispatch rate for all topics of the specified namespace")
+ public void setSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value =
+ "Subscription dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, namespace);
- internalSetSubscriptionDispatchRate(dispatchRate);
+ internalSetSubscriptionDispatchRateAsync(dispatchRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the subscription dispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -853,20 +862,36 @@ public class Namespaces extends NamespacesBase {
+ "in dispatch-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tenant,
+ public void getSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetSubscriptionDispatchRate();
+ internalGetSubscriptionDispatchRateAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get the subscription dispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@Path("/{tenant}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Delete Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant,
+ public void deleteSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalDeleteSubscriptionDispatchRate();
+ internalDeleteSubscriptionDispatchRateAsync()
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to delete the subscription dispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -879,7 +904,8 @@ public class Namespaces extends NamespacesBase {
internalDeleteSubscribeRateAsync()
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- log.error("Failed to delete the subscribeRate for cluster on namespace {}", namespaceName, ex);
+ log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -897,7 +923,8 @@ public class Namespaces extends NamespacesBase {
internalSetSubscribeRateAsync(subscribeRate)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- log.error("Failed to update the subscribeRate for cluster on namespace {}", namespaceName, ex);
+ log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}",
+ clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -914,7 +941,7 @@ public class Namespaces extends NamespacesBase {
internalGetSubscribeRateAsync()
.thenAccept(subscribeRate -> asyncResponse.resume(subscribeRate))
.exceptionally(ex -> {
- log.error("Failed to get subscribe rate for namespace {}", namespaceName, ex);
+ log.error("[{}] Failed to get subscribe rate for namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index e6b928a57a7..4857f85a905 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -99,6 +100,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.zookeeper.KeeperException.Code;
@@ -1838,4 +1840,41 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
BundlesData bundles = admin.namespaces().getBundles(namespace);
assertEquals(bundles.getNumBundles(), 14);
}
+
+ @Test
+ public void testOperationSubscriptionDispatchRate() throws Exception {
+ String namespace = "sub-dispatchrate-namespace";
+
+ // 0. create subscription dispatch rate test namespace
+ asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster,
+ namespace, BundlesData.builder().build()));
+
+ // 1. set subscription dispatch
+ asyncRequests(response -> namespaces.setSubscriptionDispatchRate(response, this.testTenant, this.testLocalCluster,
+ namespace, DispatchRateImpl.builder().build()));
+
+ // 2. check subscription dispatch
+ DispatchRate dispatchRate = (DispatchRate) asyncRequests(
+ response -> namespaces.getSubscriptionDispatchRate(response,
+ this.testTenant, this.testLocalCluster, namespace));
+ assertNotNull(dispatchRate);
+ assertEquals(-1, dispatchRate.getDispatchThrottlingRateInMsg());
+
+ // 3. delete & check subscription dispatch
+ asyncRequests(response -> namespaces.deleteSubscriptionDispatchRate(response,
+ this.testTenant, this.testLocalCluster, namespace));
+ assertNull(asyncRequests(response -> namespaces.getSubscriptionDispatchRate(response,
+ this.testTenant, this.testLocalCluster,
+ namespace)));
+
+ // 4. exception check
+ try {
+ asyncRequests(response -> namespaces.setSubscriptionDispatchRate(response,
+ this.testTenant, this.testLocalCluster, "testNamespace", null));
+ fail("should have failed");
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index feca36307e6..aa0f6793fcd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -197,4 +197,4 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
this.testTenant, this.testNamespace));
assertTrue(Objects.isNull(dispatchRate));
}
-}
\ No newline at end of file
+}