You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/19 18:43:48 UTC
[incubator-pulsar] branch master updated: Fix lookup problem with
partions in a non-persistent topics (#1251)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4f1dc52 Fix lookup problem with partions in a non-persistent topics (#1251)
4f1dc52 is described below
commit 4f1dc52c7eeb77222bf1b6c39074c1df624e15d3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Feb 19 10:43:45 2018 -0800
Fix lookup problem with partions in a non-persistent topics (#1251)
---
.../broker/admin/impl/PersistentTopicsBase.java | 4 +-
.../pulsar/client/api/NonPersistentTopicTest.java | 50 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b459011..5d83f8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1094,8 +1094,8 @@ public class PersistentTopicsBase extends AdminResource {
throw ex;
}
- String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(),
- "persistent", dn.getEncodedLocalName());
+ String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(), dn.getDomain().toString(),
+ dn.getEncodedLocalName());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index cebce57..5f45bc4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -186,6 +186,56 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
}
+ @Test(dataProvider = "subscriptionType")
+ public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int numPartitions = 5;
+ final String topic = "non-persistent://my-property/use/my-ns/partitioned-topic";
+ admin.nonPersistentTopics().createPartitionedTopic(topic, numPartitions);
+
+ PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_PORT)
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ Consumer consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1").subscriptionType(type)
+ .subscribe();
+
+ Producer producer = client.newProducer().topic(topic).create();
+
+ // Ensure all partitions exist
+ for (int i = 0; i < numPartitions; i++) {
+ DestinationName partition = DestinationName.get(topic).getPartition(i);
+ assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString()));
+ }
+
+ int totalProduceMsg = 500;
+ for (int i = 0; i < totalProduceMsg; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ Thread.sleep(10);
+ }
+
+ Message msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < totalProduceMsg; i++) {
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.acknowledge(msg);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ } else {
+ break;
+ }
+ }
+ assertEquals(messageSet.size(), totalProduceMsg);
+
+ producer.close();
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ client.close();
+ }
+
/**
* It verifies that broker doesn't dispatch messages if consumer runs out of permits
* filled out with messages
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.