You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/11 03:31:59 UTC
[pulsar] branch branch-2.10 updated: Fix get non-persistent topics issue in Namespaces. (#16170) (#16514)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 22ae18b0d96 Fix get non-persistent topics issue in Namespaces. (#16170) (#16514)
22ae18b0d96 is described below
commit 22ae18b0d96761c9f0876f2d130300ead8d138ef
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jul 11 11:31:52 2022 +0800
Fix get non-persistent topics issue in Namespaces. (#16170) (#16514)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 44 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 10 ++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 10 ++---
3 files changed, 52 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5cdbdc9930a..06e54831461 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -47,6 +47,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -60,6 +61,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -164,6 +166,48 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
+ CommandGetTopicsOfNamespace.Mode mode) {
+ switch (mode) {
+ case ALL:
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+ .thenCombine(internalGetNonPersistentTopics(policies),
+ (persistentTopics, nonPersistentTopics) ->
+ ListUtils.union(persistentTopics, nonPersistentTopics));
+ case NON_PERSISTENT:
+ return internalGetNonPersistentTopics(policies);
+ case PERSISTENT:
+ default:
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+ }
+ }
+
+ protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policies policies) {
+ final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
+ final List<String> boundaries = policies.bundles.getBoundaries();
+ for (int i = 0; i < boundaries.size() - 1; i++) {
+ final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .getListInBundleAsync(namespaceName.toString(), bundle));
+ } catch (PulsarServerException e) {
+ throw new RestException(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures)
+ .thenApply(__ -> {
+ final List<String> topics = Lists.newArrayList();
+ for (int i = 0; i < futures.size(); i++) {
+ List<String> topicList = futures.get(i).join();
+ if (topicList != null) {
+ topics.addAll(topicList);
+ }
+ }
+ return topics.stream().filter(name -> !TopicName.get(name).isPersistent())
+ .collect(Collectors.toList());
+ });
+ }
+
@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 276ed51a1ab..e978f9050fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -130,12 +130,10 @@ public class Namespaces extends NamespacesBase {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
- validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);
-
- // Validate that namespace exists, throws 404 if it doesn't exist
- getNamespacePolicies(namespaceName);
-
- pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
+ // Validate that namespace exists, throws 404 if it doesn't exist
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 18a8eff4a9a..fbe0918b8a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -101,12 +101,10 @@ public class Namespaces extends NamespacesBase {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(tenant, namespace);
- validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);
-
- // Validate that namespace exists, throws 404 if it doesn't exist
- getNamespacePolicies(namespaceName);
-
- pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
+ // Validate that namespace exists, throws 404 if it doesn't exist
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);