You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/14 07:35:07 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

315157973 opened a new pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818


   
   ### Motivation
   support set MaxUnackedMessagesOnConsumer on topic level
   
   ### Modifications
   Support set/get/remove MaxUnackedMessagesOnConsumer policy on topic level.
   
   ### Verifying this change
   Added Unit test to verify set/get/remove MaxUnackedMessagesOnConsumer policy at Topic level work as expected when Topic level policy is enabled/disabled
   
   - org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumerApi
   - org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumer


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#issuecomment-673978592


   @jianyun8023 Please help review this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#issuecomment-674616772


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470624931



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       minor: remove `asyncResponse` on `setMaxUnackedMessagesOnConsumer `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470995489



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       I'm sorry, I made a mistake.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
jiazhai commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470972777



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       @jianyun8023 would you please explain a little more of your reason for asyncResponse?
   We need asyncResponse to make the request async.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470663570



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       emm...I am a little confused that all previous set methods have `asyncResponse`,such as `setBacklogQuota`、`setMessageTTL`、`setRetention`...
   Is it better to be consistent?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470990626



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       Modify this method, remove the parameter `asyncResponse`, and modify the return  to `CompletableFuture<Void>`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470663570



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       emm...I am a little confused that all previous set methods have `asyncResponse`,such as `setBacklogQuota`、`setMessageTTL`、`setRetention`...
   Is it better to be consistent? @jianyun8023 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470994808



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       > Modify this method, remove the parameter `asyncResponse`, and modify the return to `CompletableFuture<Void>`
   
   Is it a mistake? This is not an internal method. Please take a look. @jianyun8023 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #7818: Support MaxUnackedMessagesOnConsumer on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #7818:
URL: https://github.com/apache/pulsar/pull/7818#discussion_r470994862



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);

Review comment:
       internalSetMaxUnackedMessagesOnConsumer already returns CompletableFuture<Void> 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org