You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/08 04:46:05 UTC

[pulsar] branch master updated: [improve][CLI] Support filtering system topic when get list. (#15410)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a0dcab679b1 [improve][CLI] Support filtering system topic when get list. (#15410)
a0dcab679b1 is described below

commit a0dcab679b1842a2efee4df2f31bf41ef5ab1dd0
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun May 8 12:45:54 2022 +0800

    [improve][CLI] Support filtering system topic when get list. (#15410)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++
 .../broker/admin/v2/NonPersistentTopics.java       |  6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 12 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 61 ++++++++++++++++++-
 .../systopic/PartitionedSystemTopicTest.java       |  8 ++-
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pulsar/client/admin/ListTopicsOptions.java     | 40 +++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 70 +++++++++++++++++++---
 .../pulsar/client/admin/internal/TopicsImpl.java   | 58 ++++++++++++++----
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  7 ++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 22 +++++--
 .../org/apache/pulsar/admin/cli/TestCmdTopics.java |  3 +-
 12 files changed, 254 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 847406dfc89..033aa41cb52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4990,4 +4990,10 @@ public class PersistentTopicsBase extends AdminResource {
                                     .updateTopicPoliciesAsync(topicName, topicPolicies);
                         }));
     }
+
+    protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
+        return topics.stream()
+                .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 5cf78278350..ec88906cc94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -378,7 +378,9 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
-            @QueryParam("bundle") String nsBundle) {
+            @QueryParam("bundle") String nsBundle,
+            @ApiParam(value = "Include system topic")
+            @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
         Policies policies = null;
         try {
             validateNamespaceName(tenant, namespace);
@@ -430,7 +432,7 @@ public class NonPersistentTopics extends PersistentTopics {
                         topics.stream()
                                 .filter(name -> !TopicName.get(name).isPersistent())
                                 .collect(Collectors.toList());
-                asyncResponse.resume(nonPersistentTopics);
+                asyncResponse.resume(filterSystemTopic(nonPersistentTopics, includeSystemTopic));
             }
         });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b6424f5c425..953b0f5f821 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -99,10 +99,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
-            @QueryParam("bundle") String bundle) {
+            @QueryParam("bundle") String bundle,
+            @ApiParam(value = "Include system topic")
+            @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
         try {
             validateNamespaceName(tenant, namespace);
-            asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
+            asyncResponse.resume(filterSystemTopic(internalGetList(Optional.ofNullable(bundle)), includeSystemTopic));
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
@@ -124,9 +126,11 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
-            @PathParam("namespace") String namespace) {
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Include system topic")
+            @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
         validateNamespaceName(tenant, namespace);
-        return internalGetPartitionedTopicList();
+        return filterSystemTopic(internalGetPartitionedTopicList(), includeSystemTopic);
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index b1bc4ab5486..330d945f651 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -152,7 +152,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         admin.clusters().createCluster("test",ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
         admin.tenants().createTenant(this.testTenant,
                 new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
+        admin.tenants().createTenant("pulsar",
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
         admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test"));
+        admin.namespaces().createNamespace("pulsar/system", 4);
     }
 
     @Override
@@ -626,16 +629,70 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
-        List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "__change_events", 3, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, false);
 
         Assert.assertEquals(persistentPartitionedTopics.size(), 1);
         Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
+        persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, true);
+        Assert.assertEquals(persistentPartitionedTopics.size(), 2);
 
-        List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace);
+        List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace, false);
         Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
         Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
     }
 
+    @Test
+    public void testGetList() throws Exception {
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic-1", 1, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "__change_events", 1, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getList(response, testTenant, testNamespace, null, false);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        List<String> topics = (List<String>) responseCaptor.getValue();
+        Assert.assertEquals(topics.size(), 1);
+
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getList(response, testTenant, testNamespace, null, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        topics = (List<String>) responseCaptor.getValue();
+        Assert.assertEquals(topics.size(), 2);
+
+        nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "test-topic-2", false, null);
+        nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "__change_events", false, null);
+
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.getList(response, testTenant, testNamespace, null, false);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        topics = (List<String>) responseCaptor.getValue();
+        Assert.assertEquals(topics.size(), 1);
+
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.getList(response, testTenant, testNamespace, null, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        topics = (List<String>) responseCaptor.getValue();
+        Assert.assertEquals(topics.size(), 2);
+    }
+
     @Test
     public void testGrantNonPartitionedTopic() {
         final String topicName = "non-partitioned-topic";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 8923d01d92b..af36df1690f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -89,9 +90,12 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
 
         int partitions = admin.topics().getPartitionedTopicMetadata(
                 String.format("persistent://%s/%s", ns, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
-        Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1);
+        List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(ns,
+                ListTopicsOptions.builder().includeSystemTopic(true).build());
+        Assert.assertEquals(partitionedTopicList.size(), 1);
         Assert.assertEquals(partitions, PARTITIONS);
-        Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
+        Assert.assertEquals(admin.topics().getList(ns, null,
+                ListTopicsOptions.builder().includeSystemTopic(true).build()).size(), PARTITIONS);
         reader.close();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 8f2dacc434f..e1c2ecd4834 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -150,7 +150,7 @@ public class TransactionTest extends TransactionTestBase {
 
         // getList does not include transaction system topic
         List<String> list = admin.topics().getList(NAMESPACE1);
-        assertEquals(list.size(), 4);
+        assertEquals(list.size(), 2);
         list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
 
         try {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java
new file mode 100644
index 00000000000..2fa9c069f8f
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class ListTopicsOptions {
+
+    public static final ListTopicsOptions EMPTY = ListTopicsOptions.builder().build();
+
+    /**
+     * Namespace bundle.
+     */
+    private final String bundle;
+
+    /**
+     * Set to true to get topics including system topic, otherwise not.
+     */
+    private final boolean includeSystemTopic;
+
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 508066a4311..97bd9031f6d 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -112,6 +112,13 @@ public interface Topics {
      */
     List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException;
 
+    /**
+     * @deprecated use {@link #getList(String, TopicDomain, ListTopicsOptions)} instead.
+     */
+    @Deprecated
+    List<String> getList(String namespace, TopicDomain topicDomain, Map<QueryParam, Object> params)
+            throws PulsarAdminException;
+
     /**
      * Get the list of topics under a namespace.
      * <p/>
@@ -130,8 +137,8 @@ public interface Topics {
      *            use {@link TopicDomain#non_persistent} to get non-persistent topics
      *            Use null to get both persistent and non-persistent topics
      *
-     * @param params
-     *            params to filter the results
+     * @param options
+     *            params to query the topics
      *
      * @return a list of topics
      *
@@ -142,7 +149,7 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    List<String> getList(String namespace, TopicDomain topicDomain, Map<QueryParam, Object> params)
+    List<String> getList(String namespace, TopicDomain topicDomain, ListTopicsOptions options)
             throws PulsarAdminException;
 
     /**
@@ -183,7 +190,12 @@ public interface Topics {
      */
     CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain);
 
-
+    /**
+     * @deprecated use {@link #getListAsync(String, TopicDomain, ListTopicsOptions)} instead.
+     */
+    @Deprecated
+    CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
+                                                 Map<QueryParam, Object> params);
     /**
      * Get the list of topics under a namespace asynchronously.
      * <p/>
@@ -202,13 +214,12 @@ public interface Topics {
      *            use {@link TopicDomain#non_persistent} to get non-persistent topics
      *            Use null to get both persistent and non-persistent topics
      *
-     * @param params
-     *            params to filter the results
+     * @param options
+     *            params to get the topics
      *
      * @return a list of topics
      */
-    CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
-            Map<QueryParam, Object> params);
+    CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain, ListTopicsOptions options);
 
     /**
      * Get the list of partitioned topics under a namespace.
@@ -249,6 +260,49 @@ public interface Topics {
      */
     CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace);
 
+    /**
+     * Get the list of partitioned topics under a namespace.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>["persistent://my-tenant/my-namespace/topic-1",
+     *  "persistent://my-tenant/my-namespace/topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param options
+     *            params to get the topics
+     * @return a list of partitioned topics
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getPartitionedTopicList(String namespace, ListTopicsOptions options) throws PulsarAdminException;
+
+    /**
+     * Get the list of partitioned topics under a namespace asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>["persistent://my-tenant/my-namespace/topic-1",
+     *  "persistent://my-tenant/my-namespace/topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param options
+     *           params to filter the results
+     * @return a list of partitioned topics
+     */
+    CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace, ListTopicsOptions options);
+
     /**
      * Get list of topics exist into given bundle.
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 08b7bd5f2e1..2f095c314e8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.pulsar.client.admin.GetStatsOptions;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -136,18 +137,28 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public List<String> getList(String namespace) throws PulsarAdminException {
-        return getList(namespace, null, null);
+        return getList(namespace, null);
     }
 
     @Override
     public List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException {
-        return getList(namespace, topicDomain, Collections.emptyMap());
+        return getList(namespace, topicDomain, ListTopicsOptions.EMPTY);
     }
 
     @Override
     public List<String> getList(String namespace, TopicDomain topicDomain, Map<QueryParam, Object> params)
             throws PulsarAdminException {
-        return sync(() -> getListAsync(namespace, topicDomain, params));
+        ListTopicsOptions options = ListTopicsOptions
+                .builder()
+                .bundle((String) params.get(QueryParam.Bundle))
+                .build();
+        return getList(namespace, topicDomain, options);
+    }
+
+    @Override
+    public List<String> getList(String namespace, TopicDomain topicDomain, ListTopicsOptions options)
+            throws PulsarAdminException {
+        return sync(() -> getListAsync(namespace, topicDomain, options));
     }
 
     @Override
@@ -157,21 +168,31 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain) {
-        return getListAsync(namespace, topicDomain, Collections.emptyMap());
+        return getListAsync(namespace, topicDomain, ListTopicsOptions.EMPTY);
     }
 
     @Override
     public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
-            Map<QueryParam, Object> params) {
+                                                        Map<QueryParam, Object> params) {
+        ListTopicsOptions options = ListTopicsOptions
+                .builder()
+                .bundle((String) params.get(QueryParam.Bundle.value))
+                .build();
+        return getListAsync(namespace, topicDomain, options);
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
+                                                        ListTopicsOptions options) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget persistentPath = namespacePath("persistent", ns);
         WebTarget nonPersistentPath = namespacePath("non-persistent", ns);
-        if (params != null && !params.isEmpty()) {
-            for (Entry<QueryParam, Object> param : params.entrySet()) {
-                persistentPath = persistentPath.queryParam(param.getKey().value, param.getValue());
-                nonPersistentPath = nonPersistentPath.queryParam(param.getKey().value, param.getValue());
-            }
-        }
+        persistentPath = persistentPath
+                .queryParam("bundle", options.getBundle())
+                .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
+        nonPersistentPath = nonPersistentPath
+                .queryParam("bundle", options.getBundle())
+                .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
         final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
         final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
         if (topicDomain == null || TopicDomain.persistent.equals(topicDomain)) {
@@ -214,14 +235,27 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException {
-        return sync(() -> getPartitionedTopicListAsync(namespace));
+        return getPartitionedTopicList(namespace, ListTopicsOptions.EMPTY);
+    }
+
+    @Override
+    public List<String> getPartitionedTopicList(String namespace, ListTopicsOptions options)
+            throws PulsarAdminException {
+        return sync(() -> getPartitionedTopicListAsync(namespace, options));
     }
 
     @Override
     public CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace) {
+        return getPartitionedTopicListAsync(namespace, ListTopicsOptions.EMPTY);
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace, ListTopicsOptions options) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget persistentPath = namespacePath("persistent", ns, "partitioned");
         WebTarget nonPersistentPath = namespacePath("non-persistent", ns, "partitioned");
+        persistentPath = persistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
+        nonPersistentPath = nonPersistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
         final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
         final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
         asyncGetRequest(persistentPath,
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 299ee713c68..8279fa048b0 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.Brokers;
 import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.Lookup;
 import org.apache.pulsar.client.admin.Namespaces;
@@ -1316,7 +1317,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).revokePermissions("persistent://myprop/clust/ns1/ds1", "admin");
 
         cmdTopics.run(split("list myprop/clust/ns1"));
-        verify(mockTopics).getList("myprop/clust/ns1", null, null);
+        verify(mockTopics).getList("myprop/clust/ns1", null, ListTopicsOptions.EMPTY);
 
         cmdTopics.run(split("lookup persistent://myprop/clust/ns1/ds1"));
         verify(mockLookup).lookupTopic("persistent://myprop/clust/ns1/ds1");
@@ -1425,7 +1426,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", new HashMap<>());
 
         cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
-        verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
+        verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1", ListTopicsOptions.EMPTY);
 
         cmdTopics.run(split("update-partitioned-topic persistent://myprop/clust/ns1/ds1 -p 6"));
         verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 6, false, false);
@@ -1858,7 +1859,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).createPartitionedTopic("non-persistent://myprop/ns1/ds1", 32, null);
 
         topics.run(split("list myprop/ns1"));
-        verify(mockTopics).getList("myprop/ns1", null, null);
+        verify(mockTopics).getList("myprop/ns1", null, ListTopicsOptions.EMPTY);
 
         NonPersistentTopics mockNonPersistentTopics = mock(NonPersistentTopics.class);
         when(admin.nonPersistentTopics()).thenReturn(mockNonPersistentTopics);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9c87c0f0f57..69f2fcc52ee 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -33,7 +33,6 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -46,13 +45,12 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
-import org.apache.pulsar.client.admin.Topics.QueryParam;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -374,11 +372,18 @@ public class CmdTopics extends CmdBase {
                 "--bundle" }, description = "Namespace bundle to get list of topics")
         private String bundle;
 
+        @Parameter(names = { "-ist",
+                "--include-system-topic" }, description = "Include system topic")
+        private boolean includeSystemTopic;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            print(getTopics().getList(namespace, topicDomain,
-                    StringUtils.isNotBlank(bundle) ? Collections.singletonMap(QueryParam.Bundle, bundle) : null));
+            ListTopicsOptions options = ListTopicsOptions.builder()
+                    .bundle(bundle)
+                    .includeSystemTopic(includeSystemTopic)
+                    .build();
+            print(getTopics().getList(namespace, topicDomain, options));
         }
     }
 
@@ -387,10 +392,15 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "tenant/namespace", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-ist",
+                "--include-system-topic" }, description = "Include system topic")
+        private boolean includeSystemTopic;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            print(getTopics().getPartitionedTopicList(namespace));
+            ListTopicsOptions options = ListTopicsOptions.builder().includeSystemTopic(includeSystemTopic).build();
+            print(getTopics().getPartitionedTopicList(namespace, options));
         }
     }
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
index dfef399587d..165efe5ef4f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
@@ -36,6 +36,7 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.Lookup;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Schemas;
@@ -113,7 +114,7 @@ public class TestCmdTopics {
 
         Topics topics = mock(Topics.class);
         doReturn(topicList).when(topics).getList(anyString(), any());
-        doReturn(topicList).when(topics).getList(anyString(), any(), any());
+        doReturn(topicList).when(topics).getList(anyString(), any(), any(ListTopicsOptions.class));
 
         PulsarAdmin admin = mock(PulsarAdmin.class);
         when(admin.topics()).thenReturn(topics);