You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/15 12:27:34 UTC

[GitHub] [pulsar] jiazhai commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

jiazhai commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470972777



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       @jianyun8023 would you please explain a little more of your reason for asyncResponse?
   We need asyncResponse to make the request async.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org