You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/04 08:53:09 UTC
[pulsar] branch master updated: [improve][broker]Make SubscriptionExpirationTime method async (#16328)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3b5de29c49a [improve][broker]Make SubscriptionExpirationTime method async (#16328)
3b5de29c49a is described below
commit 3b5de29c49a4ca9959a3c7b8f0da3ff491cd2e2d
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Mon Jul 4 16:52:58 2022 +0800
[improve][broker]Make SubscriptionExpirationTime method async (#16328)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 24 +++++++------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 42 ++++++++++++++++------
.../apache/pulsar/broker/admin/v2/Namespaces.java | 42 ++++++++++++++++------
3 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 3886205d776..0472a048f9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -857,17 +857,19 @@ public abstract class NamespacesBase extends AdminResource {
}));
}
- protected void internalSetSubscriptionExpirationTime(Integer expirationTime) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
-
- if (expirationTime != null && expirationTime < 0) {
- throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time");
- }
- updatePolicies(namespaceName, policies -> {
- policies.subscription_expiration_time_minutes = expirationTime;
- return policies;
- });
+ protected CompletableFuture<Void> internalSetSubscriptionExpirationTimeAsync(Integer expirationTime) {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME,
+ PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> {
+ if (expirationTime != null && expirationTime < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Invalid value for subscription expiration time");
+ }
+ }).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.subscription_expiration_time_minutes = expirationTime;
+ return policies;
+ }));
}
protected CompletableFuture<AutoTopicCreationOverride> internalGetAutoTopicCreationAsync() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index e523b0a843e..a2e3970a9ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -470,13 +470,19 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Get the subscription expiration time for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
- public Integer getSubscriptionExpirationTime(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
+ public void getSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.subscription_expiration_time_minutes;
+ validateAdminAccessForTenantAsync(property)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.subscription_expiration_time_minutes))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get subscription expiration time for namespace {}: {} ", clientAppId(),
+ namespaceName, ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -485,10 +491,18 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid expiration time") })
- public void setSubscriptionExpirationTime(@PathParam("property") String property,
+ public void setSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace, int expirationTime) {
validateNamespaceName(property, cluster, namespace);
- internalSetSubscriptionExpirationTime(expirationTime);
+ internalSetSubscriptionExpirationTimeAsync(expirationTime)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to set subscription expiration time for namespace {}: {} ", clientAppId(),
+ namespaceName, ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -496,10 +510,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Remove subscription expiration time for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
- public void removeSubscriptionExpirationTime(@PathParam("property") String property,
+ public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- internalSetSubscriptionExpirationTime(null);
+ internalSetSubscriptionExpirationTimeAsync(null)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove subscription expiration time for namespace {}: {} ", clientAppId(),
+ namespaceName, ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 2325eb704a2..370466bf8c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -410,13 +410,19 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get the subscription expiration time for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
- public Integer getSubscriptionExpirationTime(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
+ public void getSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.subscription_expiration_time_minutes;
+ validateAdminAccessForTenantAsync(tenant)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.subscription_expiration_time_minutes))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get subscription expiration time for namespace {}: {} ", clientAppId(),
+ namespaceName, ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -425,13 +431,21 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid expiration time")})
- public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant,
+ public void setSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value =
"Expiration time in minutes for the specified namespace",
required = true) int expirationTime) {
validateNamespaceName(tenant, namespace);
- internalSetSubscriptionExpirationTime(expirationTime);
+ internalSetSubscriptionExpirationTimeAsync(expirationTime)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to set subscription expiration time for namespace {}: {} ", clientAppId(),
+ namespaceName, ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -439,10 +453,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Remove subscription expiration time for namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
- public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
+ public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalSetSubscriptionExpirationTime(null);
+ internalSetSubscriptionExpirationTimeAsync(null)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove subscription expiration time for namespace {}: {} ", clientAppId(),
+ namespaceName, ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET