You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/07/27 02:06:35 UTC
[pulsar] branch master updated: [improve][broker] make some methods async in Namespaces (#16784)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3bbb36fda2 [improve][broker] make some methods async in Namespaces (#16784)
a3bbb36fda2 is described below
commit a3bbb36fda26d1f7824077127ebe749d277d7bc8
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Wed Jul 27 10:06:28 2022 +0800
[improve][broker] make some methods async in Namespaces (#16784)
* [improve][broker] make some methods async in Namespaces
* fix unit test
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 20 --------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 31 ++++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 60 ++++++++++++++++++----
.../apache/pulsar/broker/admin/NamespacesTest.java | 7 ++-
4 files changed, 79 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5505afc4cf8..838304117bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1584,13 +1584,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected PersistencePolicies internalGetPersistence() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.persistence;
- }
-
protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
@@ -1808,12 +1801,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected SubscriptionAuthMode internalGetSubscriptionAuthMode() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.subscription_auth_mode;
- }
-
protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -1843,13 +1830,6 @@ public abstract class NamespacesBase extends AdminResource {
return getNamespacePolicies(namespaceName).delayed_delivery_policies;
}
- protected InactiveTopicPolicies internalGetInactiveTopic() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.inactive_topic_policies;
- }
-
protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 1c2fb282e10..5084e09fd31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1257,10 +1257,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
- public PersistencePolicies getPersistence(@PathParam("property") String property,
+ public void getPersistence(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetPersistence();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.persistence))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get persistence configuration for a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1384,11 +1394,20 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get subscription auth mode in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
- public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("property") String property,
- @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getSubscriptionAuthMode(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetSubscriptionAuthMode();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.subscription_auth_mode))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get subscription auth mode in a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5417f2e19c1..8158bcc362a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1298,10 +1298,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
- public PersistencePolicies getPersistence(@PathParam("tenant") String tenant,
+ public void getPersistence(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetPersistence();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.persistence))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get persistence configuration for a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1425,10 +1435,20 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get subscription auth mode in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
- public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getSubscriptionAuthMode(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetSubscriptionAuthMode();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.subscription_auth_mode))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get subscription auth mode in a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1451,10 +1471,19 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get message encryption required status in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
- public Boolean getEncryptionRequired(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getEncryptionRequired(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetEncryptionRequired();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.encryption_required))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get message encryption required status in a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -1499,10 +1528,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"), })
- public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetInactiveTopic();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.inactive_topic_policies))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get inactive topic policies config on a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 2369a0af4bb..1a4372985ea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1062,8 +1062,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
NamespaceName testNs = this.testLocalNamespaces.get(0);
PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 0.0);
namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
- PersistencePolicies persistence2 = namespaces.getPersistence(testNs.getTenant(), testNs.getCluster(),
- testNs.getLocalName());
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.getPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
+ ArgumentCaptor<PersistencePolicies> captor = ArgumentCaptor.forClass(PersistencePolicies.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ PersistencePolicies persistence2 = captor.getValue();
assertEquals(persistence2, persistence1);
}