You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/19 18:43:48 UTC

[incubator-pulsar] branch master updated: Fix lookup problem with partions in a non-persistent topics (#1251)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f1dc52  Fix lookup problem with partions in a non-persistent topics (#1251)
4f1dc52 is described below

commit 4f1dc52c7eeb77222bf1b6c39074c1df624e15d3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Feb 19 10:43:45 2018 -0800

    Fix lookup problem with partions in a non-persistent topics (#1251)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  4 +-
 .../pulsar/client/api/NonPersistentTopicTest.java  | 50 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b459011..5d83f8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1094,8 +1094,8 @@ public class PersistentTopicsBase extends AdminResource {
                 throw ex;
             }
 
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(),
-                    "persistent", dn.getEncodedLocalName());
+            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(), dn.getDomain().toString(),
+                    dn.getEncodedLocalName());
 
             // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
             // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index cebce57..5f45bc4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -186,6 +186,56 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
     }
 
+    @Test(dataProvider = "subscriptionType")
+    public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int numPartitions = 5;
+        final String topic = "non-persistent://my-property/use/my-ns/partitioned-topic";
+        admin.nonPersistentTopics().createPartitionedTopic(topic, numPartitions);
+
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_PORT)
+                .statsInterval(0, TimeUnit.SECONDS).build();
+        Consumer consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1").subscriptionType(type)
+                .subscribe();
+
+        Producer producer = client.newProducer().topic(topic).create();
+
+        // Ensure all partitions exist
+        for (int i = 0; i < numPartitions; i++) {
+            DestinationName partition = DestinationName.get(topic).getPartition(i);
+            assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString()));
+        }
+
+        int totalProduceMsg = 500;
+        for (int i = 0; i < totalProduceMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            Thread.sleep(10);
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < totalProduceMsg; i++) {
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                String receivedMessage = new String(msg.getData());
+                log.debug("Received message: [{}]", receivedMessage);
+                String expectedMessage = "my-message-" + i;
+                testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+            } else {
+                break;
+            }
+        }
+        assertEquals(messageSet.size(), totalProduceMsg);
+
+        producer.close();
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+        client.close();
+    }
+
     /**
      * It verifies that broker doesn't dispatch messages if consumer runs out of permits
      * filled out with messages

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.