You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/28 17:22:14 UTC
[pulsar] branch master updated: fix get partitioned topics for
non-persistent topics (#4613)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 362be2a fix get partitioned topics for non-persistent topics (#4613)
362be2a is described below
commit 362be2a901dcc8f70f94970fe05decdead1b4457
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jun 28 10:22:09 2019 -0700
fix get partitioned topics for non-persistent topics (#4613)
---
.../apache/pulsar/broker/admin/AdminResource.java | 2 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 41 +++++++++++++++++++++-
2 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index b4d4c36..ff4c123 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -583,7 +583,7 @@ public abstract class AdminResource extends PulsarWebResource {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
partitionedTopics = topics.stream()
- .map(s -> String.format("persistent://%s/%s", namespaceName.toString(), decode(s)))
+ .map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index c65005b..95dffea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -19,16 +19,21 @@
package org.apache.pulsar.broker.admin;
import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -53,6 +58,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
private final String testNamespace = "my-namespace";
protected Field uriField;
protected UriInfo uriInfo;
+ private NonPersistentTopics nonPersistentTopic;
@BeforeClass
public void initPersistentTopics() throws Exception {
@@ -75,9 +81,25 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
- doReturn("persistent").when(persistentTopics).domain();
+ doReturn(TopicDomain.persistent.value()).when(persistentTopics).domain();
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
+
+ nonPersistentTopic = spy(new NonPersistentTopics());
+ nonPersistentTopic.setServletContext(new MockServletContext());
+ nonPersistentTopic.setPulsar(pulsar);
+ doReturn(mockZookKeeper).when(nonPersistentTopic).globalZk();
+ doReturn(mockZookKeeper).when(nonPersistentTopic).localZk();
+ doReturn(pulsar.getConfigurationCache().propertiesCache()).when(nonPersistentTopic).tenantsCache();
+ doReturn(pulsar.getConfigurationCache().policiesCache()).when(nonPersistentTopic).policiesCache();
+ doReturn(false).when(nonPersistentTopic).isRequestHttps();
+ doReturn(null).when(nonPersistentTopic).originalPrincipal();
+ doReturn("test").when(nonPersistentTopic).clientAppId();
+ doReturn(TopicDomain.non_persistent.value()).when(nonPersistentTopic).domain();
+ doNothing().when(nonPersistentTopic).validateAdminAccessForTenant(this.testTenant);
+ doReturn(mock(AuthenticationDataHttps.class)).when(nonPersistentTopic).clientAuthData();
+
+
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("test", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.tenants().createTenant(this.testTenant,
@@ -159,4 +181,21 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
throw e;
}
}
+
+ @Test
+ public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
+
+ persistentTopics.createPartitionedTopic(testTenant, testNamespace, "test-topic1", 3);
+
+ nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, "test-topic2", 3);
+
+ List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
+
+ Assert.assertEquals(persistentPartitionedTopics.size(), 1);
+ Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
+
+ List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace);
+ Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
+ Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
+ }
}