You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/25 15:56:26 UTC
[pulsar] branch master updated: Support get persistent topics or
non-persistent topics for pulsar admin client (#9877)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 581fd5b Support get persistent topics or non-persistent topics for pulsar admin client (#9877)
581fd5b is described below
commit 581fd5b01e6775db1429f645ffe2ce7d984c2877
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Apr 25 23:55:45 2021 +0800
Support get persistent topics or non-persistent topics for pulsar admin client (#9877)
### Motivation
Currently, we can only get all topics by the admin client. This pr supports to get persistent topic or non-persistent topic by add `--topic-domain=persistent` or `--topic-domain=non-persistent`.
For Pulsar SQL, is should only get fetch the persistent topics when list tables. I will push the next PR to fix the pulsar SQL.
### Verifying this change
Unit test and integration test added.
---
.../apache/pulsar/broker/admin/AdminApiTest2.java | 45 ++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 56 ++++++++++++++-
.../pulsar/client/admin/internal/TopicsImpl.java | 66 ++++++++++++------
.../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 7 +-
.../pulsar/tests/integration/cli/CLITest.java | 81 ++++++++++++++++++++++
6 files changed, 229 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 12e2176..1e47756 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
@@ -1959,4 +1960,48 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
verify(mockTopic, times(2)).triggerCompaction();
}
+ @Test
+ public void testGetTopicsWithDifferentMode() throws Exception {
+ final String namespace = "prop-xyz/ns1";
+
+ final String persistentTopicName = TopicName.get(
+ "persistent",
+ NamespaceName.get(namespace),
+ "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+ final String nonPersistentTopicName = TopicName.get(
+ "non-persistent",
+ NamespaceName.get(namespace),
+ "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistentTopicName).create();
+ Producer<byte[]> producer2 = pulsarClient.newProducer().topic(nonPersistentTopicName).create();
+
+ List<String> topics = new ArrayList<>(admin.topics().getList(namespace));
+ assertEquals(topics.size(), 2);
+ assertTrue(topics.contains(persistentTopicName));
+ assertTrue(topics.contains(nonPersistentTopicName));
+
+ topics.clear();
+
+ topics.addAll(admin.topics().getList(namespace, TopicDomain.persistent));
+ assertEquals(topics.size(), 1);
+ assertTrue(topics.contains(persistentTopicName));
+
+ topics.clear();
+
+ topics.addAll(admin.topics().getList(namespace, TopicDomain.non_persistent));
+ assertEquals(topics.size(), 1);
+ assertTrue(topics.contains(nonPersistentTopicName));
+
+ try {
+ admin.topics().getList(namespace, TopicDomain.getEnum("none"));
+ fail("Should failed with invalid get topic mode.");
+ } catch (IllegalArgumentException e) {
+ assertEquals(e.getMessage(), "Invalid topic domain: 'none'");
+ }
+
+ producer1.close();
+ producer2.close();
+ }
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index b1156f4..e4461db 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -52,7 +53,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
public interface Topics {
/**
- * Get the list of topics under a namespace.
+ * Get the both persistent and non-persistent topics under a namespace.
* <p/>
* Response example:
*
@@ -75,7 +76,36 @@ public interface Topics {
List<String> getList(String namespace) throws PulsarAdminException;
/**
- * Get the list of topics under a namespace asynchronously.
+ * Get the list of topics under a namespace.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>["topic://my-tenant/my-namespace/topic-1",
+ * "topic://my-tenant/my-namespace/topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @param topicDomain
+ * use {@link TopicDomain#persistent} to get persistent topics
+ * use {@link TopicDomain#non_persistent} to get non-persistent topics
+ * Use null to get both persistent and non-persistent topics
+ *
+ * @return a list of topics
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException;
+
+ /**
+ * Get both persistent and non-persistent topics under a namespace asynchronously.
* <p/>
* Response example:
*
@@ -91,6 +121,28 @@ public interface Topics {
CompletableFuture<List<String>> getListAsync(String namespace);
/**
+ * Get the list of topics under a namespace asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>["topic://my-tenant/my-namespace/topic-1",
+ * "topic://my-tenant/my-namespace/topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @param topicDomain
+ * use {@link TopicDomain#persistent} to get persistent topics
+ * use {@link TopicDomain#non_persistent} to get non-persistent topics
+ * Use null to get both persistent and non-persistent topics
+ *
+ * @return a list of topics
+ */
+ CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain);
+
+ /**
* Get the list of partitioned topics under a namespace.
* <p/>
* Response example:
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 7a82f54..1a2f777 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -104,8 +105,13 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public List<String> getList(String namespace) throws PulsarAdminException {
+ return getList(namespace, null);
+ }
+
+ @Override
+ public List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException {
try {
- return getListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ return getListAsync(namespace, topicDomain).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
@@ -118,35 +124,49 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public CompletableFuture<List<String>> getListAsync(String namespace) {
+ return getListAsync(namespace, null);
+ }
+
+ @Override
+ public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget persistentPath = namespacePath("persistent", ns);
WebTarget nonPersistentPath = namespacePath("non-persistent", ns);
final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
- asyncGetRequest(persistentPath,
- new InvocationCallback<List<String>>() {
- @Override
- public void completed(List<String> topics) {
- persistentList.complete(topics);
- }
+ if (topicDomain == null || TopicDomain.persistent.equals(topicDomain)) {
+ asyncGetRequest(persistentPath,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> topics) {
+ persistentList.complete(topics);
+ }
- @Override
- public void failed(Throwable throwable) {
- persistentList.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- asyncGetRequest(nonPersistentPath,
- new InvocationCallback<List<String>>() {
- @Override
- public void completed(List<String> a) {
- nonPersistentList.complete(a);
- }
+ @Override
+ public void failed(Throwable throwable) {
+ persistentList.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ } else {
+ persistentList.complete(Collections.emptyList());
+ }
- @Override
- public void failed(Throwable throwable) {
- nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
+ if (topicDomain == null || TopicDomain.non_persistent.equals(topicDomain)) {
+ asyncGetRequest(nonPersistentPath,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> a) {
+ nonPersistentList.complete(a);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ } else {
+ nonPersistentList.complete(Collections.emptyList());
+ }
return persistentList.thenCombine(nonPersistentList,
(l1, l2) -> new ArrayList<>(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7159ef7..2b1edd8 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -786,7 +786,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).revokePermissions("persistent://myprop/clust/ns1/ds1", "admin");
cmdTopics.run(split("list myprop/clust/ns1"));
- verify(mockTopics).getList("myprop/clust/ns1");
+ verify(mockTopics).getList("myprop/clust/ns1", null);
cmdTopics.run(split("lookup persistent://myprop/clust/ns1/ds1"));
verify(mockLookup).lookupTopic("persistent://myprop/clust/ns1/ds1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 85a83c9..92c3955 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -44,7 +44,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -54,6 +53,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
@@ -251,10 +251,13 @@ public class CmdTopics extends CmdBase {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
+ @Parameter(names = {"-td", "--topic-domain"}, description = "Allowed topic domain (persistent, non_persistent).")
+ private TopicDomain topicDomain;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
- print(getTopics().getList(namespace));
+ print(getTopics().getList(namespace, topicDomain));
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index c57b205..e9af5b6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.concurrent.TimeUnit;
+import java.util.UUID;
import lombok.Cleanup;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -35,6 +36,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.examples.pojo.Tick;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -72,6 +75,84 @@ public class CLITest extends PulsarTestSuite {
}
@Test
+ public void testGetTopicListCommand() throws Exception {
+ ContainerExecResult result;
+
+ final String namespaceLocalName = "list-topics-" + randomName(8);
+ result = pulsarCluster.createNamespace(namespaceLocalName);
+ final String namespace = "public/" + namespaceLocalName;
+ assertEquals(0, result.getExitCode());
+
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+
+ final String persistentTopicName = TopicName.get(
+ "persistent",
+ NamespaceName.get(namespace),
+ "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+ final String nonPersistentTopicName = TopicName.get(
+ "non-persistent",
+ NamespaceName.get(namespace),
+ "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+ Producer<byte[]> producer1 = client.newProducer()
+ .topic(persistentTopicName)
+ .create();
+
+ Producer<byte[]> producer2 = client.newProducer()
+ .topic(nonPersistentTopicName)
+ .create();
+
+ BrokerContainer container = pulsarCluster.getAnyBroker();
+
+ result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "list",
+ namespace);
+
+ assertTrue(result.getStdout().contains(persistentTopicName));
+ assertTrue(result.getStdout().contains(nonPersistentTopicName));
+
+ result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "list",
+ "--topic-domain",
+ "persistent",
+ namespace);
+
+ assertTrue(result.getStdout().contains(persistentTopicName));
+ assertFalse(result.getStdout().contains(nonPersistentTopicName));
+
+ result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "list",
+ "--topic-domain",
+ "non_persistent",
+ namespace);
+
+ assertFalse(result.getStdout().contains(persistentTopicName));
+ assertTrue(result.getStdout().contains(nonPersistentTopicName));
+
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "list",
+ "--topic-domain",
+ "none",
+ namespace);
+ fail();
+ } catch (ContainerExecException ignore) {
+ }
+
+ producer1.close();
+ producer2.close();
+ }
+
+ @Test
public void testCreateSubscriptionCommand() throws Exception {
String topic = "testCreateSubscriptionCommmand";