You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/30 02:42:24 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]Make getList async (#16221)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 92d229db95c [improve][broker][PIP-149]Make getList async (#16221)
92d229db95c is described below

commit 92d229db95c61ef70861f18b3cb91b0c3963a3ce
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Thu Jun 30 10:42:17 2022 +0800

    [improve][broker][PIP-149]Make getList async (#16221)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 38 +++++++++++++++++++---
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 +++++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 18 +++++-----
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  2 +-
 4 files changed, 55 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4c5c4a836a7..5815e31d535 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -183,6 +183,32 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+            .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
+            .thenAccept(exists -> {
+                if (!exists) {
+                    throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+                }
+            })
+            .thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
+            .thenApply(topics ->
+                topics.stream()
+                    .filter(topic -> {
+                        if (isTransactionInternalName(TopicName.get(topic))) {
+                            return false;
+                        }
+                        if (bundle.isPresent()) {
+                            NamespaceBundle b = pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                .getBundle(TopicName.get(topic));
+                            return b != null && bundle.get().equals(b.getBundleRange());
+                        }
+                        return true;
+                    })
+                    .collect(Collectors.toList())
+            );
+    }
+
     protected CompletableFuture<List<String>> internalGetListAsync() {
         return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
                 .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
@@ -4249,18 +4275,22 @@ public class PersistentTopicsBase extends AdminResource {
 
         return getPartitionedTopicMetadataAsync(
                 TopicName.get(topicName.getPartitionedTopicName()), false, false)
-                .thenApply(partitionedTopicMetadata -> {
+                .thenAccept(partitionedTopicMetadata -> {
                     if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
                         final String topicErrorType = partitionedTopicMetadata
                                 == null ? "has no metadata" : "has zero partitions";
                         throw new RestException(Status.NOT_FOUND, String.format(
                                 "Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
-                    } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
+                    }
+                })
+                .thenCompose(__ -> internalGetListAsync(Optional.empty()))
+                .thenApply(topics -> {
+                    if (!topics.contains(topicName.toString())) {
                         throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
                     }
                     throw new RestException(Status.NOT_FOUND,
-                            getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
-                });
+                        getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
+            });
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 028c17da3d3..a114bf54cca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -71,14 +71,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
             @QueryParam("bundle") String bundle) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalGetListAsync(Optional.ofNullable(bundle))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1fa09d747a1..bac4145871b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -100,14 +100,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("bundle") String bundle,
             @ApiParam(value = "Include system topic")
             @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            asyncResponse.resume(filterSystemTopic(internalGetList(Optional.ofNullable(bundle)), includeSystemTopic));
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalGetListAsync(Optional.ofNullable(bundle))
+            .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 425acf6c664..2a4e204c044 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -792,7 +792,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         AsyncResponse response = mock(AsyncResponse.class);
         persistentTopics.getList(response, property, cluster, namespace, null);
-        verify(response, times(1)).resume(Lists.newArrayList());
+        verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
         // create topic
         response = mock(AsyncResponse.class);
         persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);