You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/27 10:26:26 UTC

[pulsar] branch master updated: Fix get non-persistent topics issue in Namespaces. (#16170)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 877795ead64 Fix get non-persistent topics issue in Namespaces. (#16170)
877795ead64 is described below

commit 877795ead640039a0bcb5ef0b9aa190c3536ca1e
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 27 18:26:18 2022 +0800

    Fix get non-persistent topics issue in Namespaces. (#16170)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 44 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  2 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  2 +-
 3 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b632eb88509..3886205d776 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -49,6 +49,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -162,6 +164,48 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
+                                                                      CommandGetTopicsOfNamespace.Mode mode) {
+        switch (mode) {
+            case ALL:
+                return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+                        .thenCombine(internalGetNonPersistentTopics(policies),
+                                (persistentTopics, nonPersistentTopics) ->
+                                        ListUtils.union(persistentTopics, nonPersistentTopics));
+            case NON_PERSISTENT:
+                return internalGetNonPersistentTopics(policies);
+            case PERSISTENT:
+            default:
+                return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+        }
+    }
+
+    protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policies policies) {
+        final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
+        final List<String> boundaries = policies.bundles.getBoundaries();
+        for (int i = 0; i < boundaries.size() - 1; i++) {
+            final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
+            try {
+                futures.add(pulsar().getAdminClient().topics()
+                        .getListInBundleAsync(namespaceName.toString(), bundle));
+            } catch (PulsarServerException e) {
+                throw new RestException(e);
+            }
+        }
+        return FutureUtil.waitForAll(futures)
+                .thenApply(__ -> {
+                    final List<String> topics = Lists.newArrayList();
+                    for (int i = 0; i < futures.size(); i++) {
+                        List<String> topicList = futures.get(i).join();
+                        if (topicList != null) {
+                            topics.addAll(topicList);
+                        }
+                    }
+                    return topics.stream().filter(name -> !TopicName.get(name).isPersistent())
+                            .collect(Collectors.toList());
+                });
+    }
+
     @SuppressWarnings("deprecation")
     protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
         validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c04773c222d..e523b0a843e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -147,7 +147,7 @@ public class Namespaces extends NamespacesBase {
         validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
                 // Validate that namespace exists, throws 404 if it doesn't exist
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+                .thenCompose(policies -> internalGetListOfTopics(policies, mode))
                 .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
                 .thenAccept(response::resume)
                 .exceptionally(ex -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 89c47225552..2325eb704a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -121,7 +121,7 @@ public class Namespaces extends NamespacesBase {
         validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
                 // Validate that namespace exists, throws 404 if it doesn't exist
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+                .thenCompose(policies -> internalGetListOfTopics(policies, mode))
                 .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
                 .thenAccept(response::resume)
                 .exceptionally(ex -> {