You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/11 03:41:57 UTC

[pulsar] branch branch-2.9 updated: [Branch-2.9][Cherry-pick] Fix get non-persistent topics issue in Namespaces. (#16517)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 0735f6be652 [Branch-2.9][Cherry-pick] Fix get non-persistent topics issue in Namespaces. (#16517)
0735f6be652 is described below

commit 0735f6be652bae0a8868489c16c55ad0487e6994
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jul 11 11:41:51 2022 +0800

    [Branch-2.9][Cherry-pick] Fix get non-persistent topics issue in Namespaces. (#16517)
    
    * Fix get non-persistent topics issue in Namespaces. (#16170)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 44 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 10 ++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 10 ++---
 .../pulsar/broker/web/PulsarWebResource.java       | 23 +++++++++++
 4 files changed, 75 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index caf75a99d1d..2be1e32ba32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -48,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -61,6 +62,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -163,6 +165,48 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
+                                                                      CommandGetTopicsOfNamespace.Mode mode) {
+        switch (mode) {
+            case ALL:
+                return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+                        .thenCombine(internalGetNonPersistentTopics(policies),
+                                (persistentTopics, nonPersistentTopics) ->
+                                        ListUtils.union(persistentTopics, nonPersistentTopics));
+            case NON_PERSISTENT:
+                return internalGetNonPersistentTopics(policies);
+            case PERSISTENT:
+            default:
+                return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+        }
+    }
+
+    protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policies policies) {
+        final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
+        final List<String> boundaries = policies.bundles.getBoundaries();
+        for (int i = 0; i < boundaries.size() - 1; i++) {
+            final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
+            try {
+                futures.add(pulsar().getAdminClient().topics()
+                        .getListInBundleAsync(namespaceName.toString(), bundle));
+            } catch (PulsarServerException e) {
+                throw new RestException(e);
+            }
+        }
+        return FutureUtil.waitForAll(futures)
+                .thenApply(__ -> {
+                    final List<String> topics = Lists.newArrayList();
+                    for (int i = 0; i < futures.size(); i++) {
+                        List<String> topicList = futures.get(i).join();
+                        if (topicList != null) {
+                            topics.addAll(topicList);
+                        }
+                    }
+                    return topics.stream().filter(name -> !TopicName.get(name).isPersistent())
+                            .collect(Collectors.toList());
+                });
+    }
+
     @SuppressWarnings("deprecation")
     protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
         validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index afb6174c550..ab8ca2d65f1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -129,12 +129,10 @@ public class Namespaces extends NamespacesBase {
                                   @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
                                   @Suspended AsyncResponse asyncResponse) {
         validateNamespaceName(property, cluster, namespace);
-        validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);
-
-        // Validate that namespace exists, throws 404 if it doesn't exist
-        getNamespacePolicies(namespaceName);
-
-        pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+        validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
+                // Validate that namespace exists, throws 404 if it doesn't exist
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies -> internalGetListOfTopics(policies, mode))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 4fe4cf31560..89efd8f83de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -99,12 +99,10 @@ public class Namespaces extends NamespacesBase {
                                   @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
                                   @Suspended AsyncResponse asyncResponse) {
         validateNamespaceName(tenant, namespace);
-        validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);
-
-        // Validate that namespace exists, throws 404 if it doesn't exist
-        getNamespacePolicies(namespaceName);
-
-        pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+        validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
+                // Validate that namespace exists, throws 404 if it doesn't exist
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies -> internalGetListOfTopics(policies, mode))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 20652d14a53..da9a3f060ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -946,6 +946,29 @@ public abstract class PulsarWebResource {
         }
     }
 
+    public CompletableFuture<Void> validateNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   NamespaceOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+                && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                return FutureUtil.failedFuture(
+                        new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"));
+            }
+
+            return pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespaceOperationAsync(namespaceName, operation, originalPrincipal(),
+                            clientAppId(), clientAuthData())
+                    .thenAccept(isAuthorized -> {
+                        if (!isAuthorized) {
+                            throw new RestException(Status.FORBIDDEN,
+                                    String.format("Unauthorized to validateNamespaceOperation for"
+                                          + " operation [%s] on namespace [%s]", operation.toString(), namespaceName));
+                        }
+                    });
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     public void validateNamespacePolicyOperation(NamespaceName namespaceName,
                                                  PolicyName policy,
                                                  PolicyOperation operation) {