You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/04 08:53:09 UTC

[pulsar] branch master updated: [improve][broker]Make SubscriptionExpirationTime method async (#16328)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b5de29c49a [improve][broker]Make SubscriptionExpirationTime method async (#16328)
3b5de29c49a is described below

commit 3b5de29c49a4ca9959a3c7b8f0da3ff491cd2e2d
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Mon Jul 4 16:52:58 2022 +0800

    [improve][broker]Make SubscriptionExpirationTime method async (#16328)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 24 +++++++------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 42 ++++++++++++++++------
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 42 ++++++++++++++++------
 3 files changed, 77 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 3886205d776..0472a048f9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -857,17 +857,19 @@ public abstract class NamespacesBase extends AdminResource {
                 }));
     }
 
-    protected void internalSetSubscriptionExpirationTime(Integer expirationTime) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
-        if (expirationTime != null && expirationTime < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time");
-        }
-        updatePolicies(namespaceName, policies -> {
-            policies.subscription_expiration_time_minutes = expirationTime;
-            return policies;
-        });
+    protected CompletableFuture<Void> internalSetSubscriptionExpirationTimeAsync(Integer expirationTime) {
+        return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME,
+                PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> {
+                    if (expirationTime != null && expirationTime < 0) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Invalid value for subscription expiration time");
+                    }
+                }).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+                    policies.subscription_expiration_time_minutes = expirationTime;
+                    return policies;
+                }));
     }
 
     protected CompletableFuture<AutoTopicCreationOverride> internalGetAutoTopicCreationAsync() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index e523b0a843e..a2e3970a9ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -470,13 +470,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Get the subscription expiration time for the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public Integer getSubscriptionExpirationTime(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
+    public void getSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+                                              @PathParam("property") String property,
+        @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.subscription_expiration_time_minutes;
+        validateAdminAccessForTenantAsync(property)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.subscription_expiration_time_minutes))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get subscription expiration time for namespace {}: {} ", clientAppId(),
+                            namespaceName, ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -485,10 +491,18 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Invalid expiration time") })
-    public void setSubscriptionExpirationTime(@PathParam("property") String property,
+    public void setSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+                                              @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, int expirationTime) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetSubscriptionExpirationTime(expirationTime);
+        internalSetSubscriptionExpirationTimeAsync(expirationTime)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to set subscription expiration time for namespace {}: {} ", clientAppId(),
+                            namespaceName, ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -496,10 +510,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Remove subscription expiration time for namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public void removeSubscriptionExpirationTime(@PathParam("property") String property,
+    public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+                                                 @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetSubscriptionExpirationTime(null);
+        internalSetSubscriptionExpirationTimeAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to remove subscription expiration time for namespace {}: {} ", clientAppId(),
+                            namespaceName, ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 2325eb704a2..370466bf8c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -410,13 +410,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get the subscription expiration time for the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public Integer getSubscriptionExpirationTime(@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(tenant);
+    public void getSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+                                                 @PathParam("tenant") String tenant,
+                                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.subscription_expiration_time_minutes;
+        validateAdminAccessForTenantAsync(tenant)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.subscription_expiration_time_minutes))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get subscription expiration time for namespace {}: {} ", clientAppId(),
+                            namespaceName, ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -425,13 +431,21 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Invalid expiration time")})
-    public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant,
+    public void setSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+                                              @PathParam("tenant") String tenant,
                                               @PathParam("namespace") String namespace,
                                               @ApiParam(value =
                                                       "Expiration time in minutes for the specified namespace",
                                                       required = true) int expirationTime) {
         validateNamespaceName(tenant, namespace);
-        internalSetSubscriptionExpirationTime(expirationTime);
+        internalSetSubscriptionExpirationTimeAsync(expirationTime)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to set subscription expiration time for namespace {}: {} ", clientAppId(),
+                            namespaceName, ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -439,10 +453,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Remove subscription expiration time for namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
-    public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
+    public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+                                                 @PathParam("tenant") String tenant,
                                                  @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalSetSubscriptionExpirationTime(null);
+        internalSetSubscriptionExpirationTimeAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to remove subscription expiration time for namespace {}: {} ", clientAppId(),
+                            namespaceName, ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET