You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/15 02:29:03 UTC

[GitHub] [pulsar] 315157973 commented on a change in pull request #8948: Support configure max subscriptions per topic on the topic level policy

315157973 commented on a change in pull request #8948:
URL: https://github.com/apache/pulsar/pull/8948#discussion_r542997515



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1816,6 +1816,91 @@ public void removePersistence(@Suspended final AsyncResponse asyncResponse,
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Get maxSubscriptionsPerTopic config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic();
+            if (!maxSubscriptionsPerTopic.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(maxSubscriptionsPerTopic.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Set maxSubscriptionsPerTopic config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Invalid value of maxSubscriptionsPerTopic")})
+    public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic,
+                                @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Updating maxSubscriptionsPerTopic failed", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Updating maxSubscriptionsPerTopic failed", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
+                                + ", maxSubscriptions={}"
+                        , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic);
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Remove maxSubscriptionsPerTopic config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {

Review comment:
       The current configuration has 3 states: disabled (0), enabled (>0), and not set.
   Disabled means that even if the policy is set at the upper level, this level will not take effect.
   `Not set` to distinguish whether to use this level of policy or the upper level. Of course, we can set -1 to mean not set. But it can only be a numeric configuration.
   Therefore, I want to be consistent with other configurations, and use null to indicate not set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org