You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/15 15:52:59 UTC

[GitHub] [pulsar] shibd opened a new pull request, #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

shibd opened a new pull request, #15608:
URL: https://github.com/apache/pulsar/pull/15608

   ### Motivation
   See PIP https://github.com/apache/pulsar/issues/14365 and change tracker https://github.com/apache/pulsar/issues/15043.
   
   Make NamespacesBase `getDeduplication / modifyDeduplication / removeDeduplication` methods to pure async.
   
   ### Documentation
   - [x]  no-need-doc
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#discussion_r873256073


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or enabling broker side deduplication "
                                             + "for all topics in the specified namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
     @ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   ```suggestion
                   .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
     @ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or enabling broker side deduplication "
                                             + "for all topics in the specified namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(null);
+        internalModifyDeduplicationAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail remove broker deduplication config for namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or enabling broker side deduplication "
                                             + "for all topics in the specified namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(null);
+        internalModifyDeduplicationAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   ```suggestion
                   .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for namespace {}", namespace, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to get broker deduplication config for namespace {}", namespace, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
     @ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   From the logs, we can see that before this patch, the response HTTP code is 204.
   
   ```
   2022-05-16T08:47:05,257 - INFO  - [pulsar-web-30-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [16/May/2022:08:47:05 +0800] "POST /admin/v2/namespaces/my-property/my-ns/deduplication HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.11.0-SNAPSHOT" 6
   ```
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or enabling broker side deduplication "
                                             + "for all topics in the specified namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   ```suggestion
                   .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#issuecomment-1127343525

   @codelipenghui All suggested fixed. PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Anonymitaet commented on pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#issuecomment-1127158028

   @shibd please do not delete the backticks for the doc label, or else Bot cannot recognize it and label your PR w/ `doc-info-missing`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#issuecomment-1126968494

   @shibd:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#issuecomment-1127341515

   > @shibd please do not delete the backticks for the doc label, or else Bot cannot recognize it and label your PR w/ doc-info-missing.
   Thanks, get.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#issuecomment-1127482635

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #15608: [improve][broker] Make some operation deduplication methods in Namespaces async.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #15608:
URL: https://github.com/apache/pulsar/pull/15608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org