You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/27 10:26:26 UTC
[pulsar] branch master updated: Fix get non-persistent topics issue in Namespaces. (#16170)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 877795ead64 Fix get non-persistent topics issue in Namespaces. (#16170)
877795ead64 is described below
commit 877795ead640039a0bcb5ef0b9aa190c3536ca1e
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 27 18:26:18 2022 +0800
Fix get non-persistent topics issue in Namespaces. (#16170)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 44 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 2 +-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 2 +-
3 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b632eb88509..3886205d776 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -49,6 +49,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -162,6 +164,48 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
+ CommandGetTopicsOfNamespace.Mode mode) {
+ switch (mode) {
+ case ALL:
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+ .thenCombine(internalGetNonPersistentTopics(policies),
+ (persistentTopics, nonPersistentTopics) ->
+ ListUtils.union(persistentTopics, nonPersistentTopics));
+ case NON_PERSISTENT:
+ return internalGetNonPersistentTopics(policies);
+ case PERSISTENT:
+ default:
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+ }
+ }
+
+ protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policies policies) {
+ final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
+ final List<String> boundaries = policies.bundles.getBoundaries();
+ for (int i = 0; i < boundaries.size() - 1; i++) {
+ final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .getListInBundleAsync(namespaceName.toString(), bundle));
+ } catch (PulsarServerException e) {
+ throw new RestException(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures)
+ .thenApply(__ -> {
+ final List<String> topics = Lists.newArrayList();
+ for (int i = 0; i < futures.size(); i++) {
+ List<String> topicList = futures.get(i).join();
+ if (topicList != null) {
+ topics.addAll(topicList);
+ }
+ }
+ return topics.stream().filter(name -> !TopicName.get(name).isPersistent())
+ .collect(Collectors.toList());
+ });
+ }
+
@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c04773c222d..e523b0a843e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -147,7 +147,7 @@ public class Namespaces extends NamespacesBase {
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+ .thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenAccept(response::resume)
.exceptionally(ex -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 89c47225552..2325eb704a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -121,7 +121,7 @@ public class Namespaces extends NamespacesBase {
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+ .thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenAccept(response::resume)
.exceptionally(ex -> {