You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/26 08:31:05 UTC

[pulsar] branch master updated: [feature][CLI] Add query options to the get topics in Namespace. (#16167)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c715e08c67 [feature][CLI] Add query options to the get topics in Namespace. (#16167)
6c715e08c67 is described below

commit 6c715e08c67c40221c8c08cbe0b31d3a6aa1bf32
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Jun 26 16:30:58 2022 +0800

    [feature][CLI] Add query options to the get topics in Namespace. (#16167)
    
    ### Motivation
    
    Since #2025 has added query param - `Mode`,  but the CLI is still not supported.
    
    #15410 has introduced to filter system topic.
    
    So this patch is supported the above query params for Namespace#getTopics.
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  6 +++
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ---
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  5 ++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  5 ++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 24 ++++++++++++
 .../BrokerServiceAutoTopicCreationTest.java        | 12 ++++--
 .../client/admin/ListNamespaceTopicsOptions.java   | 39 +++++++++++++++++++
 .../java/org/apache/pulsar/client/admin/Mode.java  | 44 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 45 ++++++++++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      | 31 +++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +++++++-
 12 files changed, 223 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 31c87055838..e76834a8580 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -846,4 +846,10 @@ public abstract class AdminResource extends PulsarWebResource {
     protected static String getSubNotFoundErrorMessage(String topic, String subscription) {
         return String.format("Subscription %s not found for topic %s", subscription, topic);
     }
+
+    protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
+        return topics.stream()
+                .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 5ec93241921..c6fe2fef7a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5278,12 +5278,6 @@ public class PersistentTopicsBase extends AdminResource {
                         }));
     }
 
-    protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
-        return topics.stream()
-                .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
-                .collect(Collectors.toList());
-    }
-
     protected CompletableFuture<Boolean> internalGetSchemaValidationEnforced(boolean applied) {
         return getTopicPoliciesAsyncWithRetry(topicName)
                 .thenApply(op -> op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index beca9529ae0..c04773c222d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -140,12 +140,15 @@ public class Namespaces extends NamespacesBase {
                           @PathParam("property") String property,
                           @PathParam("cluster") String cluster,
                           @PathParam("namespace") String namespace,
-                          @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+                          @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+                          @ApiParam(value = "Include system topic")
+                          @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
         validateNamespaceName(property, cluster, namespace);
         validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
                 // Validate that namespace exists, throws 404 if it doesn't exist
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
                 .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+                .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
                 .thenAccept(response::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5191a1170ec..89c47225552 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -114,12 +114,15 @@ public class Namespaces extends NamespacesBase {
     public void getTopics(@Suspended AsyncResponse response,
                           @PathParam("tenant") String tenant,
                           @PathParam("namespace") String namespace,
-                          @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+                          @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+                          @ApiParam(value = "Include system topic")
+                          @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
         validateNamespaceName(tenant, namespace);
         validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
                 // Validate that namespace exists, throws 404 if it doesn't exist
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
                 .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+                .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
                 .thenAccept(response::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index f568882384a..ef335d3eba2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -61,6 +61,8 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
+import org.apache.pulsar.client.admin.Mode;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
@@ -2509,4 +2511,26 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
             assertEquals(admin.topics().getSchemaValidationEnforced(topic, false), true)
         );
     }
+
+    @Test
+    public void testGetNamespaceTopicList() throws Exception {
+        final String persistentTopic = "persistent://prop-xyz/ns1/testGetNamespaceTopicList";
+        final String nonPersistentTopic = "non-persistent://prop-xyz/ns1/non-testGetNamespaceTopicList";
+        final String eventTopic = "persistent://prop-xyz/ns1/__change_events";
+        admin.topics().createNonPartitionedTopic(persistentTopic);
+        Awaitility.await().untilAsserted(() ->
+                admin.namespaces().getTopics("prop-xyz/ns1",
+                ListNamespaceTopicsOptions.builder().mode(Mode.PERSISTENT).includeSystemTopic(true).build())
+                        .contains(eventTopic));
+        List<String> notIncludeSystemTopics = admin.namespaces().getTopics("prop-xyz/ns1",
+                ListNamespaceTopicsOptions.builder().includeSystemTopic(false).build());
+        Assert.assertFalse(notIncludeSystemTopics.contains(eventTopic));
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(nonPersistentTopic)
+                .create();
+        List<String> notPersistentTopics = admin.namespaces().getTopics("prop-xyz/ns1",
+                ListNamespaceTopicsOptions.builder().mode(Mode.NON_PERSISTENT).build());
+        Assert.assertTrue(notPersistentTopics.contains(nonPersistentTopic));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index fde4b703c44..44a52d4b85e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -22,8 +22,11 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
@@ -399,7 +402,8 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
 
         pulsarClient.newProducer().topic(topicString).create();
 
-        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc",
+                ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
         assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
     }
 
@@ -409,9 +413,11 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
 
         final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
 
-        pulsarClient.newProducer().topic(topicString).create();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicString).create();
 
-        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc",
+                ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
         assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
     }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListNamespaceTopicsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListNamespaceTopicsOptions.java
new file mode 100644
index 00000000000..8c5bc4b9741
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListNamespaceTopicsOptions.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class ListNamespaceTopicsOptions {
+
+    public static final ListNamespaceTopicsOptions EMPTY = ListNamespaceTopicsOptions.builder().build();
+
+    /**
+     * Set to true to get topics including system topic, otherwise not.
+     */
+    private final boolean includeSystemTopic;
+
+    /**
+     * Allowed topic domain mode (persistent, non_persistent, all).
+     */
+    private final Mode mode;
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Mode.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Mode.java
new file mode 100644
index 00000000000..7f732d9052e
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Mode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+public enum Mode {
+
+    PERSISTENT(0), NON_PERSISTENT(1), ALL(2),;
+    private final int value;
+    private Mode(int value) {
+        this.value = value;
+    }
+    public int getValue() {
+        return value;
+    }
+    public static Mode valueOf(int n) {
+        switch (n) {
+            case 0 :
+                return PERSISTENT;
+            case 1 :
+                return NON_PERSISTENT;
+            case 2 :
+                return ALL;
+            default :
+                return null;
+
+        }
+    }
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index f279f70839c..39cf49cf961 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -162,6 +162,51 @@ public interface Namespaces {
      */
     CompletableFuture<List<String>> getTopicsAsync(String namespace);
 
+    /**
+     * Get the list of topics.
+     * <p/>
+     * Get the list of all the topics under a certain namespace.
+     * <p/>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["persistent://my-tenant/use/namespace1/my-topic-1",
+     *  "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param options
+     *            List namespace topics options
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getTopics(String namespace, ListNamespaceTopicsOptions options) throws PulsarAdminException;
+
+    /**
+     * Get the list of topics asynchronously.
+     * <p/>
+     * Get the list of all the topics under a certain namespace.
+     * <p/>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["persistent://my-tenant/use/namespace1/my-topic-1",
+     *  "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param options
+     *            List namespace topics options
+     */
+    CompletableFuture<List<String>> getTopicsAsync(String namespace, ListNamespaceTopicsOptions options);
+
     /**
      * Get the list of bundles.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 2058be5c199..d9672b9e196 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -29,6 +29,7 @@ import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
@@ -169,6 +170,36 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         return future;
     }
 
+    @Override
+    public List<String> getTopics(String namespace, ListNamespaceTopicsOptions options)
+            throws PulsarAdminException{
+        return sync(() -> getTopicsAsync(namespace, options));
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getTopicsAsync(String namespace, ListNamespaceTopicsOptions options) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        String action = ns.isV2() ? "topics" : "destinations";
+        WebTarget path = namespacePath(ns, action);
+        path = path
+                .queryParam("mode", options.getMode())
+                .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> topics) {
+                        future.complete(topics);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     @Override
     public Policies getPolicies(String namespace) throws PulsarAdminException {
         return sync(() -> getPoliciesAsync(namespace));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7a805eb6659..e63509854e4 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.Brokers;
 import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.Lookup;
@@ -344,7 +345,7 @@ public class PulsarAdminToolTest {
         verify(mockNamespaces).getNamespaces("myprop", "clust");
 
         namespaces.run(split("topics myprop/clust/ns1"));
-        verify(mockNamespaces).getTopics("myprop/clust/ns1");
+        verify(mockNamespaces).getTopics("myprop/clust/ns1", ListNamespaceTopicsOptions.builder().build());
 
         namespaces.run(split("policies myprop/clust/ns1"));
         verify(mockNamespaces).getPolicies("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index ed0980b3ed6..8b4679e43c5 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -37,6 +37,8 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.admin.cli.utils.IOUtils;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
+import org.apache.pulsar.client.admin.Mode;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -96,10 +98,22 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(description = "tenant/namespace", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = {"-m", "--mode"},
+                description = "Allowed topic domain mode (persistent, non_persistent, all).")
+        private Mode mode;
+
+        @Parameter(names = { "-ist",
+                "--include-system-topic" }, description = "Include system topic")
+        private boolean includeSystemTopic;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            print(getAdmin().namespaces().getTopics(namespace));
+            ListNamespaceTopicsOptions options = ListNamespaceTopicsOptions.builder()
+                    .mode(mode)
+                    .includeSystemTopic(includeSystemTopic)
+                    .build();
+            print(getAdmin().namespaces().getTopics(namespace, options));
         }
     }