You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/26 08:31:05 UTC
[pulsar] branch master updated: [feature][CLI] Add query options to the get topics in Namespace. (#16167)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c715e08c67 [feature][CLI] Add query options to the get topics in Namespace. (#16167)
6c715e08c67 is described below
commit 6c715e08c67c40221c8c08cbe0b31d3a6aa1bf32
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Jun 26 16:30:58 2022 +0800
[feature][CLI] Add query options to the get topics in Namespace. (#16167)
### Motivation
Since #2025 has added query param - `Mode`, but the CLI is still not supported.
#15410 has introduced to filter system topic.
So this patch is supported the above query params for Namespace#getTopics.
---
.../apache/pulsar/broker/admin/AdminResource.java | 6 +++
.../broker/admin/impl/PersistentTopicsBase.java | 6 ---
.../apache/pulsar/broker/admin/v1/Namespaces.java | 5 ++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 5 ++-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 24 ++++++++++++
.../BrokerServiceAutoTopicCreationTest.java | 12 ++++--
.../client/admin/ListNamespaceTopicsOptions.java | 39 +++++++++++++++++++
.../java/org/apache/pulsar/client/admin/Mode.java | 44 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 45 ++++++++++++++++++++++
.../client/admin/internal/NamespacesImpl.java | 31 +++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +++++++-
12 files changed, 223 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 31c87055838..e76834a8580 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -846,4 +846,10 @@ public abstract class AdminResource extends PulsarWebResource {
protected static String getSubNotFoundErrorMessage(String topic, String subscription) {
return String.format("Subscription %s not found for topic %s", subscription, topic);
}
+
+ protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
+ return topics.stream()
+ .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
+ .collect(Collectors.toList());
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 5ec93241921..c6fe2fef7a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5278,12 +5278,6 @@ public class PersistentTopicsBase extends AdminResource {
}));
}
- protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
- return topics.stream()
- .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
- .collect(Collectors.toList());
- }
-
protected CompletableFuture<Boolean> internalGetSchemaValidationEnforced(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index beca9529ae0..c04773c222d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -140,12 +140,15 @@ public class Namespaces extends NamespacesBase {
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
- @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+ @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+ @ApiParam(value = "Include system topic")
+ @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+ .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenAccept(response::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5191a1170ec..89c47225552 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -114,12 +114,15 @@ public class Namespaces extends NamespacesBase {
public void getTopics(@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+ @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+ @ApiParam(value = "Include system topic")
+ @QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(tenant, namespace);
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+ .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenAccept(response::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index f568882384a..ef335d3eba2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -61,6 +61,8 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
+import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
@@ -2509,4 +2511,26 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getSchemaValidationEnforced(topic, false), true)
);
}
+
+ @Test
+ public void testGetNamespaceTopicList() throws Exception {
+ final String persistentTopic = "persistent://prop-xyz/ns1/testGetNamespaceTopicList";
+ final String nonPersistentTopic = "non-persistent://prop-xyz/ns1/non-testGetNamespaceTopicList";
+ final String eventTopic = "persistent://prop-xyz/ns1/__change_events";
+ admin.topics().createNonPartitionedTopic(persistentTopic);
+ Awaitility.await().untilAsserted(() ->
+ admin.namespaces().getTopics("prop-xyz/ns1",
+ ListNamespaceTopicsOptions.builder().mode(Mode.PERSISTENT).includeSystemTopic(true).build())
+ .contains(eventTopic));
+ List<String> notIncludeSystemTopics = admin.namespaces().getTopics("prop-xyz/ns1",
+ ListNamespaceTopicsOptions.builder().includeSystemTopic(false).build());
+ Assert.assertFalse(notIncludeSystemTopics.contains(eventTopic));
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(nonPersistentTopic)
+ .create();
+ List<String> notPersistentTopics = admin.namespaces().getTopics("prop-xyz/ns1",
+ ListNamespaceTopicsOptions.builder().mode(Mode.NON_PERSISTENT).build());
+ Assert.assertTrue(notPersistentTopics.contains(nonPersistentTopic));
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index fde4b703c44..44a52d4b85e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -22,8 +22,11 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
@@ -399,7 +402,8 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsarClient.newProducer().topic(topicString).create();
- assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc",
+ ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
@@ -409,9 +413,11 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
- pulsarClient.newProducer().topic(topicString).create();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicString).create();
- assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc",
+ ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListNamespaceTopicsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListNamespaceTopicsOptions.java
new file mode 100644
index 00000000000..8c5bc4b9741
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListNamespaceTopicsOptions.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class ListNamespaceTopicsOptions {
+
+ public static final ListNamespaceTopicsOptions EMPTY = ListNamespaceTopicsOptions.builder().build();
+
+ /**
+ * Set to true to get topics including system topic, otherwise not.
+ */
+ private final boolean includeSystemTopic;
+
+ /**
+ * Allowed topic domain mode (persistent, non_persistent, all).
+ */
+ private final Mode mode;
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Mode.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Mode.java
new file mode 100644
index 00000000000..7f732d9052e
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Mode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+public enum Mode {
+
+ PERSISTENT(0), NON_PERSISTENT(1), ALL(2),;
+ private final int value;
+ private Mode(int value) {
+ this.value = value;
+ }
+ public int getValue() {
+ return value;
+ }
+ public static Mode valueOf(int n) {
+ switch (n) {
+ case 0 :
+ return PERSISTENT;
+ case 1 :
+ return NON_PERSISTENT;
+ case 2 :
+ return ALL;
+ default :
+ return null;
+
+ }
+ }
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index f279f70839c..39cf49cf961 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -162,6 +162,51 @@ public interface Namespaces {
*/
CompletableFuture<List<String>> getTopicsAsync(String namespace);
+ /**
+ * Get the list of topics.
+ * <p/>
+ * Get the list of all the topics under a certain namespace.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["persistent://my-tenant/use/namespace1/my-topic-1",
+ * "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param options
+ * List namespace topics options
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<String> getTopics(String namespace, ListNamespaceTopicsOptions options) throws PulsarAdminException;
+
+ /**
+ * Get the list of topics asynchronously.
+ * <p/>
+ * Get the list of all the topics under a certain namespace.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["persistent://my-tenant/use/namespace1/my-topic-1",
+ * "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param options
+ * List namespace topics options
+ */
+ CompletableFuture<List<String>> getTopicsAsync(String namespace, ListNamespaceTopicsOptions options);
+
/**
* Get the list of bundles.
* <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 2058be5c199..d9672b9e196 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -29,6 +29,7 @@ import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
@@ -169,6 +170,36 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
return future;
}
+ @Override
+ public List<String> getTopics(String namespace, ListNamespaceTopicsOptions options)
+ throws PulsarAdminException{
+ return sync(() -> getTopicsAsync(namespace, options));
+ }
+
+ @Override
+ public CompletableFuture<List<String>> getTopicsAsync(String namespace, ListNamespaceTopicsOptions options) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ String action = ns.isV2() ? "topics" : "destinations";
+ WebTarget path = namespacePath(ns, action);
+ path = path
+ .queryParam("mode", options.getMode())
+ .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> topics) {
+ future.complete(topics);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
@Override
public Policies getPolicies(String namespace) throws PulsarAdminException {
return sync(() -> getPoliciesAsync(namespace));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7a805eb6659..e63509854e4 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.admin.Bookies;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.Brokers;
import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.Lookup;
@@ -344,7 +345,7 @@ public class PulsarAdminToolTest {
verify(mockNamespaces).getNamespaces("myprop", "clust");
namespaces.run(split("topics myprop/clust/ns1"));
- verify(mockNamespaces).getTopics("myprop/clust/ns1");
+ verify(mockNamespaces).getTopics("myprop/clust/ns1", ListNamespaceTopicsOptions.builder().build());
namespaces.run(split("policies myprop/clust/ns1"));
verify(mockNamespaces).getPolicies("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index ed0980b3ed6..8b4679e43c5 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -37,6 +37,8 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.admin.cli.utils.IOUtils;
+import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
+import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -96,10 +98,22 @@ public class CmdNamespaces extends CmdBase {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
+ @Parameter(names = {"-m", "--mode"},
+ description = "Allowed topic domain mode (persistent, non_persistent, all).")
+ private Mode mode;
+
+ @Parameter(names = { "-ist",
+ "--include-system-topic" }, description = "Include system topic")
+ private boolean includeSystemTopic;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
- print(getAdmin().namespaces().getTopics(namespace));
+ ListNamespaceTopicsOptions options = ListNamespaceTopicsOptions.builder()
+ .mode(mode)
+ .includeSystemTopic(includeSystemTopic)
+ .build();
+ print(getAdmin().namespaces().getTopics(namespace, options));
}
}