You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/18 02:05:55 UTC

[pulsar] branch master updated: [improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 132ba4ad8e3 [improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)
132ba4ad8e3 is described below

commit 132ba4ad8e3808923b2c8b0a9fa5db5a8d413542
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Wed May 18 10:05:47 2022 +0800

    [improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 21 +++++++-------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 13 +++++++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 33 ++++++++++++++++++----
 3 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 39a4eb13060..7f842ac93fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -906,13 +906,13 @@ public abstract class NamespacesBase extends AdminResource {
         internalSetAutoSubscriptionCreation(asyncResponse, null);
     }
 
-    protected void internalModifyDeduplication(Boolean enableDeduplication) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        updatePolicies(namespaceName, policies -> {
-            policies.deduplicationEnabled = enableDeduplication;
-            return policies;
-        });
+    protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean enableDeduplication) {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+                    policies.deduplicationEnabled = enableDeduplication;
+                    return policies;
+                }));
     }
 
     @SuppressWarnings("deprecation")
@@ -2146,9 +2146,10 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected Boolean internalGetDeduplication() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).deduplicationEnabled;
+    protected CompletableFuture<Boolean> internalGetDeduplicationAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.deduplicationEnabled);
     }
 
     protected Integer internalGetMaxConsumersPerTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 0ccc505d5e8..8fe1e0525c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -513,10 +513,17 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 2769b5b3bc3..a2ef19ee462 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -411,9 +411,16 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Failed to get broker deduplication config for namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -421,12 +428,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or enabling broker side deduplication "
                                             + "for all topics in the specified namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -434,9 +448,16 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(null);
+        internalModifyDeduplicationAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET