You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/08/30 03:24:33 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make deletePersistence method async in Namespaces (#17206)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 814008066f0 [improve][broker][PIP-149]make deletePersistence method async in Namespaces (#17206)
814008066f0 is described below

commit 814008066f0217099b2b7cf795647403c32d5204
Author: HuangZeGui <10...@qq.com>
AuthorDate: Tue Aug 30 11:24:23 2022 +0800

    [improve][broker][PIP-149]make deletePersistence method async in Namespaces (#17206)
    
    * make deletePersistence method async in Namespaces
    
    * update comment
    
    * remove irrelevant import
    
    * reduce unnecessary exceptions
    
    * remove redundant exception log printing logic
    
    Co-authored-by: huangzegui <hu...@didiglobal.com>
---
 .../apache/pulsar/broker/admin/impl/NamespacesBase.java | 17 +++++++++++++----
 .../org/apache/pulsar/broker/admin/v2/Namespaces.java   | 13 ++++++++++---
 2 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..08c968a2d5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1543,10 +1543,10 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected void internalDeletePersistence() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        doUpdatePersistence(null);
+    protected CompletableFuture<Void> internalDeletePersistenceAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> doUpdatePersistenceAsync(null));
     }
 
     protected void internalSetPersistence(PersistencePolicies persistence) {
@@ -1572,6 +1572,15 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    private CompletableFuture<Void> doUpdatePersistenceAsync(PersistencePolicies persistence) {
+        return updatePoliciesAsync(namespaceName, policies -> {
+            policies.persistence = persistence;
+            return policies;
+        }).thenAccept(__ -> log.info("[{}] Successfully updated persistence configuration: namespace={}, map={}",
+                clientAppId(), namespaceName, persistence)
+        );
+    }
+
     protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 9c24bc65d61..e43e01ced93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1255,10 +1255,17 @@ public class Namespaces extends NamespacesBase {
     @Path("/{tenant}/{namespace}/persistence")
     @ApiOperation(value = "Delete the persistence configuration for all topics on a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public void deletePersistence(@PathParam("tenant") String tenant,
-                                               @PathParam("namespace") String namespace) {
+    public void deletePersistence(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+                                  @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalDeletePersistence();
+        internalDeletePersistenceAsync()
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to delete the persistence for a namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST