You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/18 02:05:55 UTC
[pulsar] branch master updated: [improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 132ba4ad8e3 [improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)
132ba4ad8e3 is described below
commit 132ba4ad8e3808923b2c8b0a9fa5db5a8d413542
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Wed May 18 10:05:47 2022 +0800
[improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 21 +++++++-------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 13 +++++++--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 33 ++++++++++++++++++----
3 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 39a4eb13060..7f842ac93fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -906,13 +906,13 @@ public abstract class NamespacesBase extends AdminResource {
internalSetAutoSubscriptionCreation(asyncResponse, null);
}
- protected void internalModifyDeduplication(Boolean enableDeduplication) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- updatePolicies(namespaceName, policies -> {
- policies.deduplicationEnabled = enableDeduplication;
- return policies;
- });
+ protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean enableDeduplication) {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.deduplicationEnabled = enableDeduplication;
+ return policies;
+ }));
}
@SuppressWarnings("deprecation")
@@ -2146,9 +2146,10 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected Boolean internalGetDeduplication() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ);
- return getNamespacePolicies(namespaceName).deduplicationEnabled;
+ protected CompletableFuture<Boolean> internalGetDeduplicationAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.deduplicationEnabled);
}
protected Integer internalGetMaxConsumersPerTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 0ccc505d5e8..8fe1e0525c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -513,10 +513,17 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
- public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, boolean enableDeduplication) {
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+ boolean enableDeduplication) {
validateNamespaceName(property, cluster, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 2769b5b3bc3..a2ef19ee462 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -411,9 +411,16 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
- public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+ public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetDeduplication();
+ internalGetDeduplicationAsync()
+ .thenAccept(deduplication -> asyncResponse.resume(deduplication))
+ .exceptionally(ex -> {
+ log.error("Failed to get broker deduplication config for namespace {}", namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -421,12 +428,19 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
- public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or enabling broker side deduplication "
+ "for all topics in the specified namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -434,9 +448,16 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
- public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(null);
+ internalModifyDeduplicationAsync(null)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET