You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/11 03:41:57 UTC
[pulsar] branch branch-2.9 updated: [Branch-2.9][Cherry-pick] Fix get non-persistent topics issue in Namespaces. (#16517)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 0735f6be652 [Branch-2.9][Cherry-pick] Fix get non-persistent topics issue in Namespaces. (#16517)
0735f6be652 is described below
commit 0735f6be652bae0a8868489c16c55ad0487e6994
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jul 11 11:41:51 2022 +0800
[Branch-2.9][Cherry-pick] Fix get non-persistent topics issue in Namespaces. (#16517)
* Fix get non-persistent topics issue in Namespaces. (#16170)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 44 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 10 ++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 10 ++---
.../pulsar/broker/web/PulsarWebResource.java | 23 +++++++++++
4 files changed, 75 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index caf75a99d1d..2be1e32ba32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -48,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -61,6 +62,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -163,6 +165,48 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
+ CommandGetTopicsOfNamespace.Mode mode) {
+ switch (mode) {
+ case ALL:
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+ .thenCombine(internalGetNonPersistentTopics(policies),
+ (persistentTopics, nonPersistentTopics) ->
+ ListUtils.union(persistentTopics, nonPersistentTopics));
+ case NON_PERSISTENT:
+ return internalGetNonPersistentTopics(policies);
+ case PERSISTENT:
+ default:
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+ }
+ }
+
+ protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policies policies) {
+ final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
+ final List<String> boundaries = policies.bundles.getBoundaries();
+ for (int i = 0; i < boundaries.size() - 1; i++) {
+ final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .getListInBundleAsync(namespaceName.toString(), bundle));
+ } catch (PulsarServerException e) {
+ throw new RestException(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures)
+ .thenApply(__ -> {
+ final List<String> topics = Lists.newArrayList();
+ for (int i = 0; i < futures.size(); i++) {
+ List<String> topicList = futures.get(i).join();
+ if (topicList != null) {
+ topics.addAll(topicList);
+ }
+ }
+ return topics.stream().filter(name -> !TopicName.get(name).isPersistent())
+ .collect(Collectors.toList());
+ });
+ }
+
@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index afb6174c550..ab8ca2d65f1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -129,12 +129,10 @@ public class Namespaces extends NamespacesBase {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
- validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);
-
- // Validate that namespace exists, throws 404 if it doesn't exist
- getNamespacePolicies(namespaceName);
-
- pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
+ // Validate that namespace exists, throws 404 if it doesn't exist
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 4fe4cf31560..89efd8f83de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -99,12 +99,10 @@ public class Namespaces extends NamespacesBase {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(tenant, namespace);
- validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);
-
- // Validate that namespace exists, throws 404 if it doesn't exist
- getNamespacePolicies(namespaceName);
-
- pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
+ // Validate that namespace exists, throws 404 if it doesn't exist
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 20652d14a53..da9a3f060ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -946,6 +946,29 @@ public abstract class PulsarWebResource {
}
}
+ public CompletableFuture<Void> validateNamespaceOperationAsync(NamespaceName namespaceName,
+ NamespaceOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"));
+ }
+
+ return pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespaceOperationAsync(namespaceName, operation, originalPrincipal(),
+ clientAppId(), clientAuthData())
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN,
+ String.format("Unauthorized to validateNamespaceOperation for"
+ + " operation [%s] on namespace [%s]", operation.toString(), namespaceName));
+ }
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
public void validateNamespacePolicyOperation(NamespaceName namespaceName,
PolicyName policy,
PolicyOperation operation) {