You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/08/30 03:24:33 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make deletePersistence method async in Namespaces (#17206)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 814008066f0 [improve][broker][PIP-149]make deletePersistence method async in Namespaces (#17206)
814008066f0 is described below
commit 814008066f0217099b2b7cf795647403c32d5204
Author: HuangZeGui <10...@qq.com>
AuthorDate: Tue Aug 30 11:24:23 2022 +0800
[improve][broker][PIP-149]make deletePersistence method async in Namespaces (#17206)
* make deletePersistence method async in Namespaces
* update comment
* remove irrelevant import
* reduce unnecessary exceptions
* remove redundant exception log printing logic
Co-authored-by: huangzegui <hu...@didiglobal.com>
---
.../apache/pulsar/broker/admin/impl/NamespacesBase.java | 17 +++++++++++++----
.../org/apache/pulsar/broker/admin/v2/Namespaces.java | 13 ++++++++++---
2 files changed, 23 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..08c968a2d5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1543,10 +1543,10 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected void internalDeletePersistence() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- doUpdatePersistence(null);
+ protected CompletableFuture<Void> internalDeletePersistenceAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> doUpdatePersistenceAsync(null));
}
protected void internalSetPersistence(PersistencePolicies persistence) {
@@ -1572,6 +1572,15 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ private CompletableFuture<Void> doUpdatePersistenceAsync(PersistencePolicies persistence) {
+ return updatePoliciesAsync(namespaceName, policies -> {
+ policies.persistence = persistence;
+ return policies;
+ }).thenAccept(__ -> log.info("[{}] Successfully updated persistence configuration: namespace={}, map={}",
+ clientAppId(), namespaceName, persistence)
+ );
+ }
+
protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 9c24bc65d61..e43e01ced93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1255,10 +1255,17 @@ public class Namespaces extends NamespacesBase {
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Delete the persistence configuration for all topics on a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void deletePersistence(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void deletePersistence(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalDeletePersistence();
+ internalDeletePersistenceAsync()
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to delete the persistence for a namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST