You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/28 17:22:14 UTC

[pulsar] branch master updated: fix get partitioned topics for non-persistent topics (#4613)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 362be2a  fix get partitioned topics for non-persistent topics (#4613)
362be2a is described below

commit 362be2a901dcc8f70f94970fe05decdead1b4457
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jun 28 10:22:09 2019 -0700

    fix get partitioned topics for non-persistent topics (#4613)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 41 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index b4d4c36..ff4c123 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -583,7 +583,7 @@ public abstract class AdminResource extends PulsarWebResource {
             String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), topicDomain.value());
             List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
             partitionedTopics = topics.stream()
-                    .map(s -> String.format("persistent://%s/%s", namespaceName.toString(), decode(s)))
+                    .map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
                     .collect(Collectors.toList());
         } catch (KeeperException.NoNodeException e) {
             // NoNode means there are no partitioned topics in this domain for this namespace
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index c65005b..95dffea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -19,16 +19,21 @@
 package org.apache.pulsar.broker.admin;
 
 import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -53,6 +58,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     private final String testNamespace = "my-namespace";
     protected Field uriField;
     protected UriInfo uriInfo;
+    private NonPersistentTopics nonPersistentTopic;
 
     @BeforeClass
     public void initPersistentTopics() throws Exception {
@@ -75,9 +81,25 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(false).when(persistentTopics).isRequestHttps();
         doReturn(null).when(persistentTopics).originalPrincipal();
         doReturn("test").when(persistentTopics).clientAppId();
-        doReturn("persistent").when(persistentTopics).domain();
+        doReturn(TopicDomain.persistent.value()).when(persistentTopics).domain();
         doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
         doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
+
+        nonPersistentTopic = spy(new NonPersistentTopics());
+        nonPersistentTopic.setServletContext(new MockServletContext());
+        nonPersistentTopic.setPulsar(pulsar);
+        doReturn(mockZookKeeper).when(nonPersistentTopic).globalZk();
+        doReturn(mockZookKeeper).when(nonPersistentTopic).localZk();
+        doReturn(pulsar.getConfigurationCache().propertiesCache()).when(nonPersistentTopic).tenantsCache();
+        doReturn(pulsar.getConfigurationCache().policiesCache()).when(nonPersistentTopic).policiesCache();
+        doReturn(false).when(nonPersistentTopic).isRequestHttps();
+        doReturn(null).when(nonPersistentTopic).originalPrincipal();
+        doReturn("test").when(nonPersistentTopic).clientAppId();
+        doReturn(TopicDomain.non_persistent.value()).when(nonPersistentTopic).domain();
+        doNothing().when(nonPersistentTopic).validateAdminAccessForTenant(this.testTenant);
+        doReturn(mock(AuthenticationDataHttps.class)).when(nonPersistentTopic).clientAuthData();
+
+
         admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
         admin.clusters().createCluster("test", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
         admin.tenants().createTenant(this.testTenant,
@@ -159,4 +181,21 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
             throw e;
         }
     }
+
+    @Test
+    public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
+
+        persistentTopics.createPartitionedTopic(testTenant, testNamespace, "test-topic1", 3);
+
+        nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, "test-topic2", 3);
+
+        List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
+
+        Assert.assertEquals(persistentPartitionedTopics.size(), 1);
+        Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
+
+        List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace);
+        Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
+        Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
+    }
 }