You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/04/16 01:27:13 UTC
[incubator-pulsar] branch master updated: Support short topic name
in cpp client (#1564)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b37d7b7 Support short topic name in cpp client (#1564)
b37d7b7 is described below
commit b37d7b7a64340659e1994b568d7f4ff562865505
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Sun Apr 15 18:27:10 2018 -0700
Support short topic name in cpp client (#1564)
* Support short topic name in cpp client
* Fix format issue
* fix test
* fix test failures
* Fix format issue
---
pulsar-client-cpp/lib/TopicName.cc | 86 ++++++++++++++++++++++------
pulsar-client-cpp/lib/TopicName.h | 4 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 4 +-
pulsar-client-cpp/tests/TopicNameTest.cc | 36 ++++++++++++
4 files changed, 111 insertions(+), 19 deletions(-)
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index 1a79a5d..0ad1313 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -52,10 +52,30 @@ TopicName::TopicName() {}
bool TopicName::init(const std::string& topicName) {
topicName_ = topicName;
if (topicName.find("://") == std::string::npos) {
- LOG_ERROR("Topic name is not valid, domain not present - " << topicName);
+ std::string topicNameCopy_ = topicName;
+ std::vector<std::string> pathTokens;
+ boost::algorithm::split(pathTokens, topicNameCopy_, boost::algorithm::is_any_of("/"));
+ if (pathTokens.size() == 3) {
+ topicName_ = "persistent://" + pathTokens[0] + "/" + pathTokens[1] + "/" + pathTokens[2];
+ } else if (pathTokens.size() == 1) {
+ topicName_ = "persistent://public/default/" + pathTokens[0];
+ } else {
+ LOG_ERROR(
+ "Topic name is not valid, short topic name should be in the format of '<topic>' or "
+ "'<property>/<namespace>/<topic>' - "
+ << topicName);
+ return false;
+ }
+ }
+ isV2Topic_ = parse(topicName_, domain_, property_, cluster_, namespacePortion_, localName_);
+ if (isV2Topic_ && !cluster_.empty()) {
+ LOG_ERROR("V2 Topic name is not valid, cluster is not empty - " << topicName_ << " : cluster "
+ << cluster_);
+ return false;
+ } else if (!isV2Topic_ && cluster_.empty()) {
+ LOG_ERROR("V1 Topic name is not valid, cluster is empty - " << topicName_);
return false;
}
- parse(topicName_, domain_, property_, cluster_, namespacePortion_, localName_);
if (localName_.empty()) {
LOG_ERROR("Topic name is not valid, topic name is empty - " << topicName_);
return false;
@@ -63,28 +83,45 @@ bool TopicName::init(const std::string& topicName) {
namespaceName_ = NamespaceName::get(property_, cluster_, namespacePortion_);
return true;
}
-void TopicName::parse(const std::string& topicName, std::string& domain, std::string& property,
+bool TopicName::parse(const std::string& topicName, std::string& domain, std::string& property,
std::string& cluster, std::string& namespacePortion, std::string& localName) {
std::string topicNameCopy = topicName;
boost::replace_first(topicNameCopy, "://", "/");
std::vector<std::string> pathTokens;
boost::algorithm::split(pathTokens, topicNameCopy, boost::algorithm::is_any_of("/"));
- if (pathTokens.size() < 5) {
+ if (pathTokens.size() < 4) {
LOG_ERROR("Topic name is not valid, does not have enough parts - " << topicName);
- return;
+ return false;
}
domain = pathTokens[0];
- property = pathTokens[1];
- cluster = pathTokens[2];
- namespacePortion = pathTokens[3];
+ size_t numSlashIndexes;
+ bool isV2Topic;
+ if (pathTokens.size() == 4) {
+ // New topic name without cluster name
+ property = pathTokens[1];
+ cluster = "";
+ namespacePortion = pathTokens[2];
+ localName = pathTokens[3];
+ numSlashIndexes = 3;
+ isV2Topic = true;
+ } else {
+ // Legacy topic name that includes cluster name
+ property = pathTokens[1];
+ cluster = pathTokens[2];
+ namespacePortion = pathTokens[3];
+ localName = pathTokens[4];
+ numSlashIndexes = 4;
+ isV2Topic = false;
+ }
size_t slashIndex = -1;
- // find four '/', whatever is left is topic local name
- for (int i = 0; i < 4; i++) {
+ // find `numSlashIndexes` '/', whatever is left is topic local name
+ for (int i = 0; i < numSlashIndexes; i++) {
slashIndex = topicNameCopy.find('/', slashIndex + 1);
}
// get index to next char to '/'
slashIndex++;
localName = topicNameCopy.substr(slashIndex, (topicNameCopy.size() - slashIndex));
+ return isV2Topic;
}
std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) {
Lock lock(curlHandleMutex);
@@ -104,6 +141,8 @@ std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) {
return nameAfterEncoding;
}
+bool TopicName::isV2Topic() { return isV2Topic_; }
+
std::string TopicName::getDomain() { return domain_; }
std::string TopicName::getProperty() { return property_; }
@@ -125,9 +164,15 @@ bool TopicName::validate() {
if (domain_.compare("persistent") != 0) {
return false;
}
- if (!property_.empty() && !cluster_.empty() && !namespacePortion_.empty() && !localName_.empty()) {
+ // cluster_ can be empty
+ if (!isV2Topic_ && !property_.empty() && !cluster_.empty() && !namespacePortion_.empty() &&
+ !localName_.empty()) {
+ // v1 topic format
return NamedEntity::checkName(property_) && NamedEntity::checkName(cluster_) &&
NamedEntity::checkName(namespacePortion_);
+ } else if (isV2Topic_ && !property_.empty() && !namespacePortion_.empty() && !localName_.empty()) {
+ // v2 topic format
+ return NamedEntity::checkName(property_) && NamedEntity::checkName(namespacePortion_);
} else {
return false;
}
@@ -142,7 +187,7 @@ boost::shared_ptr<TopicName> TopicName::get(const std::string& topicName) {
if (ptr->validate()) {
return ptr;
} else {
- LOG_ERROR("Topic name validation Failed");
+ LOG_ERROR("Topic name validation Failed - " << topicName);
return boost::shared_ptr<TopicName>();
}
}
@@ -151,16 +196,25 @@ boost::shared_ptr<TopicName> TopicName::get(const std::string& topicName) {
std::string TopicName::getLookupName() {
std::stringstream ss;
std::string seperator("/");
- ss << domain_ << seperator << property_ << seperator << cluster_ << seperator << namespacePortion_
- << seperator << getEncodedLocalName();
+ if (isV2Topic_ && cluster_.empty()) {
+ ss << domain_ << seperator << property_ << seperator << namespacePortion_ << seperator
+ << getEncodedLocalName();
+ } else {
+ ss << domain_ << seperator << property_ << seperator << cluster_ << seperator << namespacePortion_
+ << seperator << getEncodedLocalName();
+ }
return ss.str();
}
std::string TopicName::toString() {
std::stringstream ss;
std::string seperator("/");
- ss << domain_ << "://" << property_ << seperator << cluster_ << seperator << namespacePortion_
- << seperator << localName_;
+ if (isV2Topic_ && cluster_.empty()) {
+ ss << domain_ << "://" << property_ << seperator << namespacePortion_ << seperator << localName_;
+ } else {
+ ss << domain_ << "://" << property_ << seperator << cluster_ << seperator << namespacePortion_
+ << seperator << localName_;
+ }
return ss.str();
}
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index b8d610c..91b7994 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -38,9 +38,11 @@ class TopicName : public ServiceUnitId {
std::string cluster_;
std::string namespacePortion_;
std::string localName_;
+ bool isV2Topic_;
boost::shared_ptr<NamespaceName> namespaceName_;
public:
+ bool isV2Topic();
std::string getLookupName();
std::string getDomain();
std::string getProperty();
@@ -58,7 +60,7 @@ class TopicName : public ServiceUnitId {
static CURL* getCurlHandle();
static CURL* curl;
static boost::mutex curlHandleMutex;
- static void parse(const std::string& topicName, std::string& domain, std::string& property,
+ static bool parse(const std::string& topicName, std::string& domain, std::string& property,
std::string& cluster, std::string& namespacePortion, std::string& localName);
TopicName();
bool validate();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index dfbf44e..f89096c 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -240,11 +240,11 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
TEST(BasicEndToEndTest, testNonExistingTopic) {
Client client(lookupUrl);
Producer producer;
- Result result = client.createProducer("persistent://prop/unit/ns1", producer);
+ Result result = client.createProducer("persistent://prop//unit/ns1/testNonExistingTopic", producer);
ASSERT_EQ(ResultInvalidTopicName, result);
Consumer consumer;
- result = client.subscribe("persistent://prop/unit/ns1", "my-sub-name", consumer);
+ result = client.subscribe("persistent://prop//unit/ns1/testNonExistingTopic", "my-sub-name", consumer);
ASSERT_EQ(ResultInvalidTopicName, result);
}
diff --git a/pulsar-client-cpp/tests/TopicNameTest.cc b/pulsar-client-cpp/tests/TopicNameTest.cc
index 373bbef..5c9410a 100644
--- a/pulsar-client-cpp/tests/TopicNameTest.cc
+++ b/pulsar-client-cpp/tests/TopicNameTest.cc
@@ -42,6 +42,42 @@ TEST(TopicNameTest, testTopicName) {
ASSERT_TRUE(*topicName1 == *topicName2);
}
+TEST(TopicNameTest, testShortTopicName) {
+ // "short-topic"
+ boost::shared_ptr<TopicName> tn1 = TopicName::get("short-topic");
+ ASSERT_EQ("public", tn1->getProperty());
+ ASSERT_EQ("", tn1->getCluster());
+ ASSERT_EQ("default", tn1->getNamespacePortion());
+ ASSERT_EQ("persistent", tn1->getDomain());
+ ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn1->getLocalName());
+
+ // tenant/namespace/topic
+ boost::shared_ptr<TopicName> tn2 = TopicName::get("tenant/namespace/short-topic");
+ ASSERT_EQ("tenant", tn2->getProperty());
+ ASSERT_EQ("", tn2->getCluster());
+ ASSERT_EQ("namespace", tn2->getNamespacePortion());
+ ASSERT_EQ("persistent", tn2->getDomain());
+ ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn2->getLocalName());
+
+ // tenant/cluster/namespace/topic
+ boost::shared_ptr<TopicName> tn3 = TopicName::get("tenant/cluster/namespace/short-topic");
+ ASSERT_FALSE(tn3);
+
+ // tenant/cluster
+ boost::shared_ptr<TopicName> tn4 = TopicName::get("tenant/cluster");
+ ASSERT_FALSE(tn4);
+}
+
+TEST(TopicNameTest, testTopicNameV2) {
+ // v2 topic names doesn't have "cluster"
+ boost::shared_ptr<TopicName> tn1 = TopicName::get("persistent://tenant/namespace/short-topic");
+ ASSERT_EQ("tenant", tn1->getProperty());
+ ASSERT_EQ("", tn1->getCluster());
+ ASSERT_EQ("namespace", tn1->getNamespacePortion());
+ ASSERT_EQ("persistent", tn1->getDomain());
+ ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn1->getLocalName());
+}
+
TEST(TopicNameTest, testTopicNameWithSlashes) {
// Compare getters and setters
boost::shared_ptr<TopicName> topicName =
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.