You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/25 15:56:26 UTC

[pulsar] branch master updated: Support get persistent topics or non-persistent topics for pulsar admin client (#9877)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 581fd5b  Support get persistent topics or non-persistent topics for pulsar admin client (#9877)
581fd5b is described below

commit 581fd5b01e6775db1429f645ffe2ce7d984c2877
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Apr 25 23:55:45 2021 +0800

    Support get persistent topics or non-persistent topics for pulsar admin client (#9877)
    
    ### Motivation
    
    Currently, we can only get all topics by the admin client. This pr supports to get persistent topic or non-persistent topic by add `--topic-domain=persistent` or `--topic-domain=non-persistent`.
    
    For Pulsar SQL, is should only get fetch the persistent topics when list tables. I will push the next PR to fix the pulsar SQL.
    
    ### Verifying this change
    
    Unit test and integration test added.
---
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 45 ++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 56 ++++++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 66 ++++++++++++------
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  2 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  7 +-
 .../pulsar/tests/integration/cli/CLITest.java      | 81 ++++++++++++++++++++++
 6 files changed, 229 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 12e2176..1e47756 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
@@ -1959,4 +1960,48 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         verify(mockTopic, times(2)).triggerCompaction();
     }
 
+    @Test
+    public void testGetTopicsWithDifferentMode() throws Exception {
+        final String namespace = "prop-xyz/ns1";
+
+        final String persistentTopicName = TopicName.get(
+                "persistent",
+                NamespaceName.get(namespace),
+                "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+        final String nonPersistentTopicName = TopicName.get(
+                "non-persistent",
+                NamespaceName.get(namespace),
+                "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistentTopicName).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(nonPersistentTopicName).create();
+
+        List<String> topics = new ArrayList<>(admin.topics().getList(namespace));
+        assertEquals(topics.size(), 2);
+        assertTrue(topics.contains(persistentTopicName));
+        assertTrue(topics.contains(nonPersistentTopicName));
+
+        topics.clear();
+
+        topics.addAll(admin.topics().getList(namespace, TopicDomain.persistent));
+        assertEquals(topics.size(), 1);
+        assertTrue(topics.contains(persistentTopicName));
+
+        topics.clear();
+
+        topics.addAll(admin.topics().getList(namespace, TopicDomain.non_persistent));
+        assertEquals(topics.size(), 1);
+        assertTrue(topics.contains(nonPersistentTopicName));
+
+        try {
+            admin.topics().getList(namespace, TopicDomain.getEnum("none"));
+            fail("Should failed with invalid get topic mode.");
+        } catch (IllegalArgumentException e) {
+            assertEquals(e.getMessage(), "Invalid topic domain: 'none'");
+        }
+
+        producer1.close();
+        producer2.close();
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index b1156f4..e4461db 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -52,7 +53,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 public interface Topics {
 
     /**
-     * Get the list of topics under a namespace.
+     * Get the both persistent and non-persistent topics under a namespace.
      * <p/>
      * Response example:
      *
@@ -75,7 +76,36 @@ public interface Topics {
     List<String> getList(String namespace) throws PulsarAdminException;
 
     /**
-     * Get the list of topics under a namespace asynchronously.
+     * Get the list of topics under a namespace.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>["topic://my-tenant/my-namespace/topic-1",
+     *  "topic://my-tenant/my-namespace/topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @param topicDomain
+     *            use {@link TopicDomain#persistent} to get persistent topics
+     *            use {@link TopicDomain#non_persistent} to get non-persistent topics
+     *            Use null to get both persistent and non-persistent topics
+     *
+     * @return a list of topics
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException;
+
+    /**
+     * Get both persistent and non-persistent topics under a namespace asynchronously.
      * <p/>
      * Response example:
      *
@@ -91,6 +121,28 @@ public interface Topics {
     CompletableFuture<List<String>> getListAsync(String namespace);
 
     /**
+     * Get the list of topics under a namespace asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>["topic://my-tenant/my-namespace/topic-1",
+     *  "topic://my-tenant/my-namespace/topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @param topicDomain
+     *            use {@link TopicDomain#persistent} to get persistent topics
+     *            use {@link TopicDomain#non_persistent} to get non-persistent topics
+     *            Use null to get both persistent and non-persistent topics
+     *
+     * @return a list of topics
+     */
+    CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain);
+
+    /**
      * Get the list of partitioned topics under a namespace.
      * <p/>
      * Response example:
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 7a82f54..1a2f777 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -104,8 +105,13 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public List<String> getList(String namespace) throws PulsarAdminException {
+        return getList(namespace, null);
+    }
+
+    @Override
+    public List<String> getList(String namespace, TopicDomain topicDomain) throws PulsarAdminException {
         try {
-            return getListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+            return getListAsync(namespace, topicDomain).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -118,35 +124,49 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public CompletableFuture<List<String>> getListAsync(String namespace) {
+        return getListAsync(namespace, null);
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getListAsync(String namespace, TopicDomain topicDomain) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget persistentPath = namespacePath("persistent", ns);
         WebTarget nonPersistentPath = namespacePath("non-persistent", ns);
         final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
         final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
-        asyncGetRequest(persistentPath,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> topics) {
-                        persistentList.complete(topics);
-                    }
+        if (topicDomain == null || TopicDomain.persistent.equals(topicDomain)) {
+            asyncGetRequest(persistentPath,
+                    new InvocationCallback<List<String>>() {
+                        @Override
+                        public void completed(List<String> topics) {
+                            persistentList.complete(topics);
+                        }
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        persistentList.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        asyncGetRequest(nonPersistentPath,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> a) {
-                        nonPersistentList.complete(a);
-                    }
+                        @Override
+                        public void failed(Throwable throwable) {
+                            persistentList.completeExceptionally(getApiException(throwable.getCause()));
+                        }
+                    });
+        } else {
+            persistentList.complete(Collections.emptyList());
+        }
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
+        if (topicDomain == null || TopicDomain.non_persistent.equals(topicDomain)) {
+            asyncGetRequest(nonPersistentPath,
+                    new InvocationCallback<List<String>>() {
+                        @Override
+                        public void completed(List<String> a) {
+                            nonPersistentList.complete(a);
+                        }
+
+                        @Override
+                        public void failed(Throwable throwable) {
+                            nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
+                        }
+                    });
+        } else {
+            nonPersistentList.complete(Collections.emptyList());
+        }
 
         return persistentList.thenCombine(nonPersistentList,
                 (l1, l2) -> new ArrayList<>(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7159ef7..2b1edd8 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -786,7 +786,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).revokePermissions("persistent://myprop/clust/ns1/ds1", "admin");
 
         cmdTopics.run(split("list myprop/clust/ns1"));
-        verify(mockTopics).getList("myprop/clust/ns1");
+        verify(mockTopics).getList("myprop/clust/ns1", null);
 
         cmdTopics.run(split("lookup persistent://myprop/clust/ns1/ds1"));
         verify(mockLookup).lookupTopic("persistent://myprop/clust/ns1/ds1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 85a83c9..92c3955 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -44,7 +44,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -54,6 +53,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
@@ -251,10 +251,13 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "tenant/namespace", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = {"-td", "--topic-domain"}, description = "Allowed topic domain (persistent, non_persistent).")
+        private TopicDomain topicDomain;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            print(getTopics().getList(namespace));
+            print(getTopics().getList(namespace, topicDomain));
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index c57b205..e9af5b6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.util.concurrent.TimeUnit;
+import java.util.UUID;
 import lombok.Cleanup;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -35,6 +36,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.examples.pojo.Tick;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -72,6 +75,84 @@ public class CLITest extends PulsarTestSuite {
     }
 
     @Test
+    public void testGetTopicListCommand() throws Exception {
+        ContainerExecResult result;
+
+        final String namespaceLocalName = "list-topics-" + randomName(8);
+        result = pulsarCluster.createNamespace(namespaceLocalName);
+        final String namespace = "public/" + namespaceLocalName;
+        assertEquals(0, result.getExitCode());
+
+        PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+
+        final String persistentTopicName = TopicName.get(
+                "persistent",
+                NamespaceName.get(namespace),
+                "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+        final String nonPersistentTopicName = TopicName.get(
+                "non-persistent",
+                NamespaceName.get(namespace),
+                "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+
+        Producer<byte[]> producer1 = client.newProducer()
+                .topic(persistentTopicName)
+                .create();
+
+        Producer<byte[]> producer2 = client.newProducer()
+                .topic(nonPersistentTopicName)
+                .create();
+
+        BrokerContainer container = pulsarCluster.getAnyBroker();
+
+        result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "topics",
+                "list",
+                namespace);
+
+        assertTrue(result.getStdout().contains(persistentTopicName));
+        assertTrue(result.getStdout().contains(nonPersistentTopicName));
+
+        result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "topics",
+                "list",
+                "--topic-domain",
+                "persistent",
+                namespace);
+
+        assertTrue(result.getStdout().contains(persistentTopicName));
+        assertFalse(result.getStdout().contains(nonPersistentTopicName));
+
+        result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "topics",
+                "list",
+                "--topic-domain",
+                "non_persistent",
+                namespace);
+
+        assertFalse(result.getStdout().contains(persistentTopicName));
+        assertTrue(result.getStdout().contains(nonPersistentTopicName));
+
+        try {
+            container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "topics",
+                "list",
+                "--topic-domain",
+                "none",
+                namespace);
+            fail();
+        } catch (ContainerExecException ignore) {
+        }
+
+        producer1.close();
+        producer2.close();
+    }
+
+    @Test
     public void testCreateSubscriptionCommand() throws Exception {
         String topic = "testCreateSubscriptionCommmand";