You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/24 16:38:08 UTC

[pulsar] branch master updated: Issue #3332: fix httplookup issue for get ns/topics in cpp (#3407)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ec897f  Issue #3332: fix httplookup issue for get ns/topics in cpp (#3407)
0ec897f is described below

commit 0ec897f7348bc5c7262b2dcce6e6b5f9a5343b0f
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri Jan 25 00:38:00 2019 +0800

    Issue #3332: fix httplookup issue for get ns/topics in cpp (#3407)
    
    * fix httplookup issue for get ns/topics
    
    * change following comments
---
 pulsar-client-cpp/lib/HTTPLookupService.cc   |  9 ++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc | 95 ++++++++++++++++++++++++++++
 2 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index b3cb023..e96c35d 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -310,18 +310,21 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json)
 NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) {
     Json::Value root;
     Json::Reader reader;
+    LOG_DEBUG("GetNamespaceTopics json = " << json);
+
+    // passed in json is like: ["topic1", "topic2"...]
+    // root will be an array of topics
     if (!reader.parse(json, root, false)) {
         LOG_ERROR("Failed to parse json of Topics of Namespace: " << reader.getFormatedErrorMessages()
                                                                   << "\nInput Json = " << json);
         return NamespaceTopicsPtr();
     }
 
-    Json::Value topicsArray = root["topics"];
     std::set<std::string> topicSet;
     // get all topics
-    for (int i = 0; i < topicsArray.size(); i++) {
+    for (int i = 0; i < root.size(); i++) {
         // remove partition part
-        const std::string &topicName = topicsArray[i].asString();
+        const std::string &topicName = root[i].asString();
         int pos = topicName.find("-partition-");
         std::string filteredName = topicName.substr(0, pos);
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 9a34fbe..4871b89 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1867,6 +1867,101 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
     client.shutdown();
 }
 
+// User adminUrl to create client, to protect http related services
+TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) {
+    Client client(adminUrl);
+    std::string pattern = "persistent://public/default/patternMultiTopicsHttpConsumer.*";
+
+    std::string subName = "testpatternMultiTopicsHttpConsumer";
+    std::string topicName1 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub1";
+    std::string topicName2 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub2";
+    std::string topicName3 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub3";
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub1/partitions";
+    std::string url2 =
+        adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub2/partitions";
+    std::string url3 =
+        adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    Result result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4");
+
+    int messageNumber = 100;
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);  // size for each sub-consumer
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, consConfig,
+                                   WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created topics consumer on a pattern that match 3 topics");
+
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+    // verify no more to receive
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
+
 // create a pattern consumer, which contains no match topics at beginning.
 // create 4 topics, in which 3 topics match the pattern.
 // verify PatternMultiTopicsConsumer subscribed matched topics, after a while,