You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/08 04:46:05 UTC
[pulsar] branch master updated: [improve][CLI] Support filtering system topic when get list. (#15410)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a0dcab679b1 [improve][CLI] Support filtering system topic when get list. (#15410)
a0dcab679b1 is described below
commit a0dcab679b1842a2efee4df2f31bf41ef5ab1dd0
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun May 8 12:45:54 2022 +0800
[improve][CLI] Support filtering system topic when get list. (#15410)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++
.../broker/admin/v2/NonPersistentTopics.java | 6 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 12 ++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 61 ++++++++++++++++++-
.../systopic/PartitionedSystemTopicTest.java | 8 ++-
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../pulsar/client/admin/ListTopicsOptions.java | 40 +++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 70 +++++++++++++++++++---
.../pulsar/client/admin/internal/TopicsImpl.java | 58 ++++++++++++++----
.../pulsar/admin/cli/PulsarAdminToolTest.java | 7 ++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 22 +++++--
.../org/apache/pulsar/admin/cli/TestCmdTopics.java | 3 +-
12 files changed, 254 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 847406dfc89..033aa41cb52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4990,4 +4990,10 @@ public class PersistentTopicsBase extends AdminResource {
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}
+
+ protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
+ return topics.stream()
+ .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
+ .collect(Collectors.toList());
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 5cf78278350..ec88906cc94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -378,7 +378,9 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the bundle name", required = false)
- @QueryParam("bundle") String nsBundle) {
+ @QueryParam("bundle") String nsBundle,
+ @ApiParam(value = "Include system topic")
+ @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
Policies policies = null;
try {
validateNamespaceName(tenant, namespace);
@@ -430,7 +432,7 @@ public class NonPersistentTopics extends PersistentTopics {
topics.stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList());
- asyncResponse.resume(nonPersistentTopics);
+ asyncResponse.resume(filterSystemTopic(nonPersistentTopics, includeSystemTopic));
}
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b6424f5c425..953b0f5f821 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -99,10 +99,12 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the bundle name", required = false)
- @QueryParam("bundle") String bundle) {
+ @QueryParam("bundle") String bundle,
+ @ApiParam(value = "Include system topic")
+ @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
try {
validateNamespaceName(tenant, namespace);
- asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
+ asyncResponse.resume(filterSystemTopic(internalGetList(Optional.ofNullable(bundle)), includeSystemTopic));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
@@ -124,9 +126,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
- @PathParam("namespace") String namespace) {
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Include system topic")
+ @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(tenant, namespace);
- return internalGetPartitionedTopicList();
+ return filterSystemTopic(internalGetPartitionedTopicList(), includeSystemTopic);
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index b1bc4ab5486..330d945f651 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -152,7 +152,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
admin.clusters().createCluster("test",ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
admin.tenants().createTenant(this.testTenant,
new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
+ admin.tenants().createTenant("pulsar",
+ new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test"));
+ admin.namespaces().createNamespace("pulsar/system", 4);
}
@Override
@@ -626,16 +629,70 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
- List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "__change_events", 3, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, false);
Assert.assertEquals(persistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
+ persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, true);
+ Assert.assertEquals(persistentPartitionedTopics.size(), 2);
- List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace);
+ List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace, false);
Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
}
+ @Test
+ public void testGetList() throws Exception {
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic-1", 1, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "__change_events", 1, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getList(response, testTenant, testNamespace, null, false);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ List<String> topics = (List<String>) responseCaptor.getValue();
+ Assert.assertEquals(topics.size(), 1);
+
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getList(response, testTenant, testNamespace, null, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ topics = (List<String>) responseCaptor.getValue();
+ Assert.assertEquals(topics.size(), 2);
+
+ nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "test-topic-2", false, null);
+ nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "__change_events", false, null);
+
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ nonPersistentTopic.getList(response, testTenant, testNamespace, null, false);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ topics = (List<String>) responseCaptor.getValue();
+ Assert.assertEquals(topics.size(), 1);
+
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ nonPersistentTopic.getList(response, testTenant, testNamespace, null, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ topics = (List<String>) responseCaptor.getValue();
+ Assert.assertEquals(topics.size(), 2);
+ }
+
@Test
public void testGrantNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 8923d01d92b..af36df1690f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -89,9 +90,12 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
int partitions = admin.topics().getPartitionedTopicMetadata(
String.format("persistent://%s/%s", ns, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
- Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1);
+ List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(ns,
+ ListTopicsOptions.builder().includeSystemTopic(true).build());
+ Assert.assertEquals(partitionedTopicList.size(), 1);
Assert.assertEquals(partitions, PARTITIONS);
- Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
+ Assert.assertEquals(admin.topics().getList(ns, null,
+ ListTopicsOptions.builder().includeSystemTopic(true).build()).size(), PARTITIONS);
reader.close();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 8f2dacc434f..e1c2ecd4834 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -150,7 +150,7 @@ public class TransactionTest extends TransactionTestBase {
// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
- assertEquals(list.size(), 4);
+ assertEquals(list.size(), 2);
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
try {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java
new file mode 100644
index 00000000000..2fa9c069f8f
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class ListTopicsOptions {
+
+ public static final ListTopicsOptions EMPTY = ListTopicsOptions.builder().build();
+
+ /**
+ * Namespace bundle.
+ */
+ private final String bundle;
+
+ /**
+ * Set to true to get topics including system topic, otherwise not.
+ */
+ private final boolean includeSystemTopic;
+
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 508066a4311..97bd9031f6d 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -112,6 +112,13 @@ public interface Topics {
*/
List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException;
+ /**
+ * @deprecated use {@link #getList(String, TopicDomain, ListTopicsOptions)} instead.
+ */
+ @Deprecated
+ List<String> getList(String namespace, TopicDomain topicDomain, Map<QueryParam, Object> params)
+ throws PulsarAdminException;
+
/**
* Get the list of topics under a namespace.
* <p/>
@@ -130,8 +137,8 @@ public interface Topics {
* use {@link TopicDomain#non_persistent} to get non-persistent topics
* Use null to get both persistent and non-persistent topics
*
- * @param params
- * params to filter the results
+ * @param options
+ * params to query the topics
*
* @return a list of topics
*
@@ -142,7 +149,7 @@ public interface Topics {
* @throws PulsarAdminException
* Unexpected error
*/
- List<String> getList(String namespace, TopicDomain topicDomain, Map<QueryParam, Object> params)
+ List<String> getList(String namespace, TopicDomain topicDomain, ListTopicsOptions options)
throws PulsarAdminException;
/**
@@ -183,7 +190,12 @@ public interface Topics {
*/
CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain);
-
+ /**
+ * @deprecated use {@link #getListAsync(String, TopicDomain, ListTopicsOptions)} instead.
+ */
+ @Deprecated
+ CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
+ Map<QueryParam, Object> params);
/**
* Get the list of topics under a namespace asynchronously.
* <p/>
@@ -202,13 +214,12 @@ public interface Topics {
* use {@link TopicDomain#non_persistent} to get non-persistent topics
* Use null to get both persistent and non-persistent topics
*
- * @param params
- * params to filter the results
+ * @param options
+ * params to get the topics
*
* @return a list of topics
*/
- CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
- Map<QueryParam, Object> params);
+ CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain, ListTopicsOptions options);
/**
* Get the list of partitioned topics under a namespace.
@@ -249,6 +260,49 @@ public interface Topics {
*/
CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace);
+ /**
+ * Get the list of partitioned topics under a namespace.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>["persistent://my-tenant/my-namespace/topic-1",
+ * "persistent://my-tenant/my-namespace/topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param options
+ * params to get the topics
+ * @return a list of partitioned topics
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<String> getPartitionedTopicList(String namespace, ListTopicsOptions options) throws PulsarAdminException;
+
+ /**
+ * Get the list of partitioned topics under a namespace asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>["persistent://my-tenant/my-namespace/topic-1",
+ * "persistent://my-tenant/my-namespace/topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param options
+ * params to filter the results
+ * @return a list of partitioned topics
+ */
+ CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace, ListTopicsOptions options);
+
/**
* Get list of topics exist into given bundle.
*
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 08b7bd5f2e1..2f095c314e8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.client.admin.GetStatsOptions;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -136,18 +137,28 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public List<String> getList(String namespace) throws PulsarAdminException {
- return getList(namespace, null, null);
+ return getList(namespace, null);
}
@Override
public List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException {
- return getList(namespace, topicDomain, Collections.emptyMap());
+ return getList(namespace, topicDomain, ListTopicsOptions.EMPTY);
}
@Override
public List<String> getList(String namespace, TopicDomain topicDomain, Map<QueryParam, Object> params)
throws PulsarAdminException {
- return sync(() -> getListAsync(namespace, topicDomain, params));
+ ListTopicsOptions options = ListTopicsOptions
+ .builder()
+ .bundle((String) params.get(QueryParam.Bundle))
+ .build();
+ return getList(namespace, topicDomain, options);
+ }
+
+ @Override
+ public List<String> getList(String namespace, TopicDomain topicDomain, ListTopicsOptions options)
+ throws PulsarAdminException {
+ return sync(() -> getListAsync(namespace, topicDomain, options));
}
@Override
@@ -157,21 +168,31 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain) {
- return getListAsync(namespace, topicDomain, Collections.emptyMap());
+ return getListAsync(namespace, topicDomain, ListTopicsOptions.EMPTY);
}
@Override
public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
- Map<QueryParam, Object> params) {
+ Map<QueryParam, Object> params) {
+ ListTopicsOptions options = ListTopicsOptions
+ .builder()
+ .bundle((String) params.get(QueryParam.Bundle.value))
+ .build();
+ return getListAsync(namespace, topicDomain, options);
+ }
+
+ @Override
+ public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain,
+ ListTopicsOptions options) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget persistentPath = namespacePath("persistent", ns);
WebTarget nonPersistentPath = namespacePath("non-persistent", ns);
- if (params != null && !params.isEmpty()) {
- for (Entry<QueryParam, Object> param : params.entrySet()) {
- persistentPath = persistentPath.queryParam(param.getKey().value, param.getValue());
- nonPersistentPath = nonPersistentPath.queryParam(param.getKey().value, param.getValue());
- }
- }
+ persistentPath = persistentPath
+ .queryParam("bundle", options.getBundle())
+ .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
+ nonPersistentPath = nonPersistentPath
+ .queryParam("bundle", options.getBundle())
+ .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
if (topicDomain == null || TopicDomain.persistent.equals(topicDomain)) {
@@ -214,14 +235,27 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException {
- return sync(() -> getPartitionedTopicListAsync(namespace));
+ return getPartitionedTopicList(namespace, ListTopicsOptions.EMPTY);
+ }
+
+ @Override
+ public List<String> getPartitionedTopicList(String namespace, ListTopicsOptions options)
+ throws PulsarAdminException {
+ return sync(() -> getPartitionedTopicListAsync(namespace, options));
}
@Override
public CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace) {
+ return getPartitionedTopicListAsync(namespace, ListTopicsOptions.EMPTY);
+ }
+
+ @Override
+ public CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace, ListTopicsOptions options) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget persistentPath = namespacePath("persistent", ns, "partitioned");
WebTarget nonPersistentPath = namespacePath("non-persistent", ns, "partitioned");
+ persistentPath = persistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
+ nonPersistentPath = nonPersistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
asyncGetRequest(persistentPath,
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 299ee713c68..8279fa048b0 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.admin.Bookies;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.Brokers;
import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.Namespaces;
@@ -1316,7 +1317,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).revokePermissions("persistent://myprop/clust/ns1/ds1", "admin");
cmdTopics.run(split("list myprop/clust/ns1"));
- verify(mockTopics).getList("myprop/clust/ns1", null, null);
+ verify(mockTopics).getList("myprop/clust/ns1", null, ListTopicsOptions.EMPTY);
cmdTopics.run(split("lookup persistent://myprop/clust/ns1/ds1"));
verify(mockLookup).lookupTopic("persistent://myprop/clust/ns1/ds1");
@@ -1425,7 +1426,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", new HashMap<>());
cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
- verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
+ verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1", ListTopicsOptions.EMPTY);
cmdTopics.run(split("update-partitioned-topic persistent://myprop/clust/ns1/ds1 -p 6"));
verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 6, false, false);
@@ -1858,7 +1859,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/ns1/ds1", 32, null);
topics.run(split("list myprop/ns1"));
- verify(mockTopics).getList("myprop/ns1", null, null);
+ verify(mockTopics).getList("myprop/ns1", null, ListTopicsOptions.EMPTY);
NonPersistentTopics mockNonPersistentTopics = mock(NonPersistentTopics.class);
when(admin.nonPersistentTopics()).thenReturn(mockNonPersistentTopics);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9c87c0f0f57..69f2fcc52ee 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -33,7 +33,6 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -46,13 +45,12 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
-import org.apache.pulsar.client.admin.Topics.QueryParam;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -374,11 +372,18 @@ public class CmdTopics extends CmdBase {
"--bundle" }, description = "Namespace bundle to get list of topics")
private String bundle;
+ @Parameter(names = { "-ist",
+ "--include-system-topic" }, description = "Include system topic")
+ private boolean includeSystemTopic;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
- print(getTopics().getList(namespace, topicDomain,
- StringUtils.isNotBlank(bundle) ? Collections.singletonMap(QueryParam.Bundle, bundle) : null));
+ ListTopicsOptions options = ListTopicsOptions.builder()
+ .bundle(bundle)
+ .includeSystemTopic(includeSystemTopic)
+ .build();
+ print(getTopics().getList(namespace, topicDomain, options));
}
}
@@ -387,10 +392,15 @@ public class CmdTopics extends CmdBase {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-ist",
+ "--include-system-topic" }, description = "Include system topic")
+ private boolean includeSystemTopic;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
- print(getTopics().getPartitionedTopicList(namespace));
+ ListTopicsOptions options = ListTopicsOptions.builder().includeSystemTopic(includeSystemTopic).build();
+ print(getTopics().getPartitionedTopicList(namespace, options));
}
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
index dfef399587d..165efe5ef4f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
@@ -36,6 +36,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Schemas;
@@ -113,7 +114,7 @@ public class TestCmdTopics {
Topics topics = mock(Topics.class);
doReturn(topicList).when(topics).getList(anyString(), any());
- doReturn(topicList).when(topics).getList(anyString(), any(), any());
+ doReturn(topicList).when(topics).getList(anyString(), any(), any(ListTopicsOptions.class));
PulsarAdmin admin = mock(PulsarAdmin.class);
when(admin.topics()).thenReturn(topics);