You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/27 06:53:36 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make getPartitionedTopicList method async (#16217)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eeb22ba9631 [improve][broker][PIP-149]make getPartitionedTopicList method async (#16217)
eeb22ba9631 is described below

commit eeb22ba9631d86528bbca8f1825feff7e70272d2
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 14:53:29 2022 +0800

    [improve][broker][PIP-149]make getPartitionedTopicList method async (#16217)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 29 ++++++++++------------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 +++++++++++---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 14 +++++++++--
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 11 +++++---
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 26 ++++++++++++++-----
 5 files changed, 67 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c6fe2fef7a8..f2b4d72c7b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -197,22 +197,19 @@ public class PersistentTopicsBase extends AdminResource {
                         !isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
     }
 
-    protected List<String> internalGetPartitionedTopicList() {
-        validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
-        // Validate that namespace exists, throws 404 if it doesn't exist
-        try {
-            if (!namespaceResources().namespaceExists(namespaceName)) {
-                log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(),
-                        namespaceName);
-                throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-            }
-        } catch (RestException e) {
-            throw e;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), namespaceName, e);
-            throw new RestException(e);
-        }
-        return getPartitionedTopicList(TopicDomain.getEnum(domain()));
+    protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync() {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+                .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
+                .thenCompose(namespaceExists -> {
+                    // Validate that namespace exists, throws 404 if it doesn't exist
+                    if (!namespaceExists) {
+                        log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist",
+                                clientAppId(), namespaceName);
+                        throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+                    } else {
+                        return getPartitionedTopicListAsync(TopicDomain.getEnum(domain()));
+                    }
+                });
     }
 
     protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index e492f799195..99c10c62f4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -24,7 +24,6 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.DELETE;
@@ -89,10 +88,21 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist")})
-    public List<String> getPartitionedTopicList(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+    public void getPartitionedTopicList(
+            @Suspended AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetPartitionedTopicList();
+        internalGetPartitionedTopicListAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 88067344949..3f3c34d78d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -120,7 +120,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error")})
-    public List<String> getPartitionedTopicList(
+    public void getPartitionedTopicList(
+            @Suspended AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -128,7 +129,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Include system topic")
             @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
         validateNamespaceName(tenant, namespace);
-        return filterSystemTopic(internalGetPartitionedTopicList(), includeSystemTopic);
+        internalGetPartitionedTopicListAsync()
+                .thenAccept(partitionedTopicList -> asyncResponse.resume(
+                        filterSystemTopic(partitionedTopicList, includeSystemTopic)))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0d2631b00a9..425acf6c664 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -794,14 +794,19 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         persistentTopics.getList(response, property, cluster, namespace, null);
         verify(response, times(1)).resume(Lists.newArrayList());
         // create topic
-        assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
+        response = mock(AsyncResponse.class);
+        persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
+        verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
-        assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
-                .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
+        response = mock(AsyncResponse.class);
+        persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
+        verify(response, timeout(5000).times(1))
+                .resume(Lists
+                        .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
 
         TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
         assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, true, false).partitions, 5);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23fee723654..2fd4389146c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -691,16 +691,30 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
-        List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, false);
-
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, false);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        List<String> persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
         Assert.assertEquals(persistentPartitionedTopics.size(), 1);
-        Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
-        persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, true);
+        Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(),
+                TopicDomain.persistent.value());
+
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
         Assert.assertEquals(persistentPartitionedTopics.size(), 2);
 
-        List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace, false);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.getPartitionedTopicList(response, testTenant, testNamespace, false);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        List<String> nonPersistentPartitionedTopics = (List<String>) responseCaptor.getValue();
         Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
-        Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
+        Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(),
+                TopicDomain.non_persistent.value());
     }
 
     @Test