You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/24 16:38:08 UTC
[pulsar] branch master updated: Issue #3332: fix httplookup issue
for get ns/topics in cpp (#3407)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0ec897f Issue #3332: fix httplookup issue for get ns/topics in cpp (#3407)
0ec897f is described below
commit 0ec897f7348bc5c7262b2dcce6e6b5f9a5343b0f
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri Jan 25 00:38:00 2019 +0800
Issue #3332: fix httplookup issue for get ns/topics in cpp (#3407)
* fix httplookup issue for get ns/topics
* change following comments
---
pulsar-client-cpp/lib/HTTPLookupService.cc | 9 ++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 95 ++++++++++++++++++++++++++++
2 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index b3cb023..e96c35d 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -310,18 +310,21 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json)
NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) {
Json::Value root;
Json::Reader reader;
+ LOG_DEBUG("GetNamespaceTopics json = " << json);
+
+ // passed in json is like: ["topic1", "topic2"...]
+ // root will be an array of topics
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json of Topics of Namespace: " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return NamespaceTopicsPtr();
}
- Json::Value topicsArray = root["topics"];
std::set<std::string> topicSet;
// get all topics
- for (int i = 0; i < topicsArray.size(); i++) {
+ for (int i = 0; i < root.size(); i++) {
// remove partition part
- const std::string &topicName = topicsArray[i].asString();
+ const std::string &topicName = root[i].asString();
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 9a34fbe..4871b89 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1867,6 +1867,101 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
client.shutdown();
}
+// User adminUrl to create client, to protect http related services
+TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) {
+ Client client(adminUrl);
+ std::string pattern = "persistent://public/default/patternMultiTopicsHttpConsumer.*";
+
+ std::string subName = "testpatternMultiTopicsHttpConsumer";
+ std::string topicName1 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub1";
+ std::string topicName2 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub2";
+ std::string topicName3 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub3";
+
+ // call admin api to make topics partitioned
+ std::string url1 =
+ adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub1/partitions";
+ std::string url2 =
+ adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub2/partitions";
+ std::string url3 =
+ adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions";
+
+ int res = makePutRequest(url1, "2");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url2, "3");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url3, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer1;
+ Result result = client.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+
+ LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4");
+
+ int messageNumber = 100;
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ consConfig.setReceiverQueueSize(10); // size for each sub-consumer
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeWithRegexAsync(pattern, subName, consConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+ LOG_INFO("created topics consumer on a pattern that match 3 topics");
+
+ std::string msgContent = "msg-content";
+ LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer1.send(msg));
+ }
+
+ msgContent = "msg-content2";
+ LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer2.send(msg));
+ }
+
+ msgContent = "msg-content3";
+ LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer3.send(msg));
+ }
+
+ LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+ for (int i = 0; i < 3 * messageNumber; i++) {
+ Message m;
+ ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+ }
+ LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+ // verify no more to receive
+ Message m;
+ ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+ client.shutdown();
+}
+
// create a pattern consumer, which contains no match topics at beginning.
// create 4 topics, in which 3 topics match the pattern.
// verify PatternMultiTopicsConsumer subscribed matched topics, after a while,