You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/07/27 02:06:35 UTC

[pulsar] branch master updated: [improve][broker] make some methods async in Namespaces (#16784)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a3bbb36fda2 [improve][broker] make some methods async in Namespaces (#16784)
a3bbb36fda2 is described below

commit a3bbb36fda26d1f7824077127ebe749d277d7bc8
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Wed Jul 27 10:06:28 2022 +0800

    [improve][broker] make some methods async in Namespaces (#16784)
    
    * [improve][broker] make some methods async in Namespaces
    
    * fix unit test
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 20 --------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 31 ++++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 60 ++++++++++++++++++----
 .../apache/pulsar/broker/admin/NamespacesTest.java |  7 ++-
 4 files changed, 79 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5505afc4cf8..838304117bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1584,13 +1584,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected PersistencePolicies internalGetPersistence() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.persistence;
-    }
-
     protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
 
@@ -1808,12 +1801,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected SubscriptionAuthMode internalGetSubscriptionAuthMode() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.subscription_auth_mode;
-    }
-
     protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
@@ -1843,13 +1830,6 @@ public abstract class NamespacesBase extends AdminResource {
         return getNamespacePolicies(namespaceName).delayed_delivery_policies;
     }
 
-    protected InactiveTopicPolicies internalGetInactiveTopic() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.inactive_topic_policies;
-    }
-
     protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
         validateSuperUserAccess();
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 1c2fb282e10..5084e09fd31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1257,10 +1257,20 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public PersistencePolicies getPersistence(@PathParam("property") String property,
+    public void getPersistence(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetPersistence();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.persistence))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get persistence configuration for a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1384,11 +1394,20 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get subscription auth mode in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
-    public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("property") String property,
-                                                        @PathParam("cluster") String cluster,
-                                                        @PathParam("namespace") String namespace) {
+    public void getSubscriptionAuthMode(@Suspended final AsyncResponse asyncResponse,
+                                        @PathParam("property") String property,
+                                        @PathParam("cluster") String cluster,
+                                        @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetSubscriptionAuthMode();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.subscription_auth_mode))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get subscription auth mode in a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5417f2e19c1..8158bcc362a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1298,10 +1298,20 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public PersistencePolicies getPersistence(@PathParam("tenant") String tenant,
+    public void getPersistence(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetPersistence();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.persistence))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get persistence configuration for a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1425,10 +1435,20 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get subscription auth mode in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
-    public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("tenant") String tenant,
-                                           @PathParam("namespace") String namespace) {
+    public void getSubscriptionAuthMode(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetSubscriptionAuthMode();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.subscription_auth_mode))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get subscription auth mode in a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1451,10 +1471,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get message encryption required status in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
-    public Boolean getEncryptionRequired(@PathParam("tenant") String tenant,
-                                          @PathParam("namespace") String namespace) {
+    public void getEncryptionRequired(@Suspended AsyncResponse asyncResponse,
+                                      @PathParam("tenant") String tenant,
+                                      @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetEncryptionRequired();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.encryption_required))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get message encryption required status in a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -1499,10 +1528,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification"), })
-    public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
-                                                              @PathParam("namespace") String namespace) {
+    public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
+                                         @PathParam("tenant") String tenant,
+                                         @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetInactiveTopic();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.inactive_topic_policies))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get inactive topic policies config on a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 2369a0af4bb..1a4372985ea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1062,8 +1062,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         NamespaceName testNs = this.testLocalNamespaces.get(0);
         PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 0.0);
         namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
-        PersistencePolicies persistence2 = namespaces.getPersistence(testNs.getTenant(), testNs.getCluster(),
-                testNs.getLocalName());
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.getPersistence(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
+        ArgumentCaptor<PersistencePolicies> captor = ArgumentCaptor.forClass(PersistencePolicies.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        PersistencePolicies persistence2 =  captor.getValue();
         assertEquals(persistence2, persistence1);
     }