You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/19 09:35:12 UTC

[GitHub] [pulsar] shibd commented on a diff in pull request #15656: [improve][broker] Make some operation SubscribeRate methods in Namespaces async

shibd commented on code in PR #15656:
URL: https://github.com/apache/pulsar/pull/15656#discussion_r876798457


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1373,43 +1373,41 @@ protected DispatchRate internalGetSubscriptionDispatchRate() {
         return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
     }
 
-    protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
-        validateSuperUserAccess();
+    protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
         log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
-        try {
+        return validateSuperUserAccessAsync().thenAccept(__ -> {
             updatePolicies(namespaceName, policies -> {
                 policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
                 return policies;
             });
             log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(),
                     namespaceName);
-        } catch (Exception e) {
+        }).exceptionally(ex -> {
             log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+                    namespaceName, ex);
+            throw new RestException(ex);
+        });
     }
 
-    protected void internalDeleteSubscribeRate() {
-        validateSuperUserAccess();
-        try {
+    protected CompletableFuture<Void> internalDeleteSubscribeRateAsync() {
+        return validateSuperUserAccessAsync().thenAccept(__ -> {
             updatePolicies(namespaceName, policies -> {
                 policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
                 return policies;
             });
             log.info("[{}] Successfully delete the subscribeRate for cluster on namespace {}", clientAppId(),
                     namespaceName);
-        } catch (Exception e) {
+        }).exceptionally(ex -> {
             log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}", clientAppId(),

Review Comment:
   Exception handling can be handed over to the Rest layer.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1373,43 +1373,41 @@ protected DispatchRate internalGetSubscriptionDispatchRate() {
         return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
     }
 
-    protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
-        validateSuperUserAccess();
+    protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
         log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
-        try {
+        return validateSuperUserAccessAsync().thenAccept(__ -> {
             updatePolicies(namespaceName, policies -> {
                 policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
                 return policies;
             });
             log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(),
                     namespaceName);
-        } catch (Exception e) {
+        }).exceptionally(ex -> {
             log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(),

Review Comment:
   And keep one error log print



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -783,31 +783,52 @@ public void deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant,
     @DELETE
     @Path("/{tenant}/{namespace}/subscribeRate")
     @ApiOperation(value = "Delete subscribe-rate throttling for all topics of the namespace")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void deleteSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+    public void deleteSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalDeleteSubscribeRate();
+        internalDeleteSubscribeRateAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to delete the subscribeRate for cluster on namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/subscribeRate")
     @ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void setSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @ApiParam(value = "Subscribe rate for all topics of the specified namespace") SubscribeRate subscribeRate) {
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+    public void setSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace,
+                                 @ApiParam(value = "Subscribe rate for all topics of the specified namespace")
+                                         SubscribeRate subscribeRate) {
         validateNamespaceName(tenant, namespace);
-        internalSetSubscribeRate(subscribeRate);
+        internalSetSubscribeRateAsync(subscribeRate)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to delete the subscribeRate for cluster on namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to update the subscribeRate for cluster on namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1373,43 +1373,41 @@ protected DispatchRate internalGetSubscriptionDispatchRate() {
         return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
     }
 
-    protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
-        validateSuperUserAccess();
+    protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
         log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
-        try {
+        return validateSuperUserAccessAsync().thenAccept(__ -> {
             updatePolicies(namespaceName, policies -> {
                 policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
                 return policies;
             });
             log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(),
                     namespaceName);
-        } catch (Exception e) {
+        }).exceptionally(ex -> {
             log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(),

Review Comment:
   Exception handling can be handed over to the Rest layer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org