You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:34 UTC
[rocketmq-client-cpp] 17/29: refactor: NamespaceUtil
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 505ea47e46439cbb273bd7dc38e4bda895a4265f
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Sep 21 15:32:28 2020 +0800
refactor: NamespaceUtil
---
include/MQClientConfig.h | 5 +-
include/MQClientConfigProxy.h | 5 +-
src/MQClientConfigImpl.hpp | 10 +-
src/common/NameSpaceUtil.cpp | 40 --------
src/common/NamespaceUtil.cpp | 119 ++++++++++++++++++++++++
src/common/{NameSpaceUtil.h => NamespaceUtil.h} | 18 ++--
src/common/UtilAll.cpp | 12 ++-
src/common/UtilAll.h | 6 +-
8 files changed, 161 insertions(+), 54 deletions(-)
diff --git a/include/MQClientConfig.h b/include/MQClientConfig.h
index ade13f0..645fc05 100644
--- a/include/MQClientConfig.h
+++ b/include/MQClientConfig.h
@@ -50,7 +50,10 @@ class ROCKETMQCLIENT_API MQClientConfig {
virtual void set_instance_name(const std::string& instanceName) = 0;
virtual const std::string& unit_name() const = 0;
- virtual void set_unit_name(std::string unitName) = 0;
+ virtual void set_unit_name(const std::string& unitName) = 0;
+
+ virtual const std::string& name_space() const = 0;
+ virtual void set_name_space(const std::string& name_space) = 0;
/**
* the num of threads to distribute network data
diff --git a/include/MQClientConfigProxy.h b/include/MQClientConfigProxy.h
index 8cea1bb..981fc7f 100644
--- a/include/MQClientConfigProxy.h
+++ b/include/MQClientConfigProxy.h
@@ -44,7 +44,10 @@ class ROCKETMQCLIENT_API MQClientConfigProxy : virtual public MQClientConfig //
void set_instance_name(const std::string& instanceName) override { client_config_->set_instance_name(instanceName); }
const std::string& unit_name() const override { return client_config_->unit_name(); }
- void set_unit_name(std::string unitName) override { client_config_->set_unit_name(unitName); }
+ void set_unit_name(const std::string& unitName) override { client_config_->set_unit_name(unitName); }
+
+ const std::string& name_space() const override { return client_config_->name_space(); }
+ void set_name_space(const std::string& name_space) override { client_config_->set_name_space(name_space); }
int tcp_transport_worker_thread_nums() const override { return client_config_->tcp_transport_worker_thread_nums(); }
void set_tcp_transport_worker_thread_nums(int num) override {
diff --git a/src/MQClientConfigImpl.hpp b/src/MQClientConfigImpl.hpp
index c0aa341..59b4601 100644
--- a/src/MQClientConfigImpl.hpp
+++ b/src/MQClientConfigImpl.hpp
@@ -21,7 +21,7 @@
#include <thread> // std::thread::hardware_concurrency
#include "MQClientConfig.h"
-#include "NameSpaceUtil.h"
+#include "NamespaceUtil.h"
#include "UtilAll.h"
namespace rocketmq {
@@ -67,14 +67,17 @@ class MQClientConfigImpl : virtual public MQClientConfig {
const std::string& namesrv_addr() const override { return namesrv_addr_; }
void set_namesrv_addr(const std::string& namesrvAddr) override {
- namesrv_addr_ = NameSpaceUtil::formatNameServerURL(namesrvAddr);
+ namesrv_addr_ = NamespaceUtil::formatNameServerURL(namesrvAddr);
}
const std::string& instance_name() const override { return instance_name_; }
void set_instance_name(const std::string& instanceName) override { instance_name_ = instanceName; }
const std::string& unit_name() const override { return unit_name_; }
- void set_unit_name(std::string unitName) override { unit_name_ = unitName; }
+ void set_unit_name(const std::string& unitName) override { unit_name_ = unitName; }
+
+ const std::string& name_space() const override { return name_space_; }
+ void set_name_space(const std::string& name_space) override { name_space_ = name_space; }
int tcp_transport_worker_thread_nums() const override { return tcp_worker_thread_nums_; }
void set_tcp_transport_worker_thread_nums(int num) override {
@@ -96,6 +99,7 @@ class MQClientConfigImpl : virtual public MQClientConfig {
std::string instance_name_;
std::string group_name_;
std::string unit_name_;
+ std::string name_space_;
int tcp_worker_thread_nums_;
uint64_t tcp_connect_timeout; // ms
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
deleted file mode 100644
index f7ceec7..0000000
--- a/src/common/NameSpaceUtil.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "NameSpaceUtil.h"
-
-#include "Logging.h"
-
-namespace rocketmq {
-
-bool NameSpaceUtil::isEndPointURL(std::string nameServerAddr) {
- if (nameServerAddr.length() >= ENDPOINT_PREFIX_LENGTH && nameServerAddr.find(ENDPOINT_PREFIX) != std::string::npos) {
- return true;
- }
- return false;
-}
-
-std::string NameSpaceUtil::formatNameServerURL(std::string nameServerAddr) {
- auto index = nameServerAddr.find(ENDPOINT_PREFIX);
- if (index != std::string::npos) {
- LOG_DEBUG("Get Name Server from endpoint [%s]",
- nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH, nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH).c_str());
- return nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH, nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH);
- }
- return nameServerAddr;
-}
-
-} // namespace rocketmq
diff --git a/src/common/NamespaceUtil.cpp b/src/common/NamespaceUtil.cpp
new file mode 100644
index 0000000..96391c5
--- /dev/null
+++ b/src/common/NamespaceUtil.cpp
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NamespaceUtil.h"
+
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+static const char NAMESPACE_SEPARATOR = '%';
+static const std::string STRING_BLANK = "";
+static const size_t RETRY_PREFIX_LENGTH = RETRY_GROUP_TOPIC_PREFIX.length();
+static const size_t DLQ_PREFIX_LENGTH = DLQ_GROUP_TOPIC_PREFIX.length();
+
+static const std::string ENDPOINT_PREFIX = "http://";
+static const size_t ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+
+std::string NamespaceUtil::withoutNamespace(const std::string& resourceWithNamespace) {
+ if (resourceWithNamespace.empty() || isSystemResource(resourceWithNamespace)) {
+ return resourceWithNamespace;
+ }
+
+ auto resourceWithoutRetryAndDLQ = withoutRetryAndDLQ(resourceWithNamespace);
+ auto index = resourceWithoutRetryAndDLQ.find(NAMESPACE_SEPARATOR);
+ if (index > 0) {
+ auto resourceWithoutNamespace = resourceWithoutRetryAndDLQ.substr(index + 1);
+ if (UtilAll::isRetryTopic(resourceWithNamespace)) {
+ return UtilAll::getRetryTopic(resourceWithoutNamespace);
+ } else if (UtilAll::isDLQTopic(resourceWithNamespace)) {
+ return UtilAll::getDLQTopic(resourceWithoutNamespace);
+ } else {
+ return resourceWithoutNamespace;
+ }
+ }
+
+ return resourceWithNamespace;
+}
+
+std::string NamespaceUtil::withoutNamespace(const std::string& resourceWithNamespace, const std::string& name_space) {
+ if (resourceWithNamespace.empty() || name_space.empty()) {
+ return resourceWithNamespace;
+ }
+
+ auto resourceWithoutRetryAndDLQ = withoutRetryAndDLQ(resourceWithNamespace);
+ if (resourceWithoutRetryAndDLQ.find(name_space + NAMESPACE_SEPARATOR) == 0) {
+ return withoutNamespace(resourceWithNamespace);
+ }
+
+ return resourceWithNamespace;
+}
+
+std::string NamespaceUtil::wrapNamespace(const std::string& name_space, const std::string& resourceWithoutNamespace) {
+ if (name_space.empty() || resourceWithoutNamespace.empty()) {
+ return resourceWithoutNamespace;
+ }
+
+ // if (isSystemResource(resourceWithoutNamespace) || isAlreadyWithNamespace(resourceWithoutNamespace, namespace)) {
+ // return resourceWithoutNamespace;
+ // }
+
+ auto resourceWithoutRetryAndDLQ = withoutRetryAndDLQ(resourceWithoutNamespace);
+
+ std::string resourceWithNamespace;
+
+ if (UtilAll::isRetryTopic(resourceWithoutNamespace)) {
+ resourceWithNamespace.append(RETRY_GROUP_TOPIC_PREFIX);
+ }
+
+ if (UtilAll::isDLQTopic(resourceWithoutNamespace)) {
+ resourceWithNamespace.append(DLQ_GROUP_TOPIC_PREFIX);
+ }
+
+ resourceWithNamespace.append(name_space);
+ resourceWithNamespace.push_back(NAMESPACE_SEPARATOR);
+ resourceWithNamespace.append(resourceWithoutRetryAndDLQ);
+
+ return resourceWithNamespace;
+}
+
+std::string NamespaceUtil::withoutRetryAndDLQ(const std::string& originalResource) {
+ if (UtilAll::isRetryTopic(originalResource)) {
+ return originalResource.substr(RETRY_PREFIX_LENGTH);
+ } else if (UtilAll::isDLQTopic(originalResource)) {
+ return originalResource.substr(DLQ_PREFIX_LENGTH);
+ } else {
+ return originalResource;
+ }
+}
+
+bool NamespaceUtil::isSystemResource(const std::string& resource) {
+ return false;
+}
+
+bool NamespaceUtil::isEndPointURL(const std::string& nameServerAddr) {
+ return nameServerAddr.find(ENDPOINT_PREFIX) == 0;
+}
+
+std::string NamespaceUtil::formatNameServerURL(const std::string& nameServerAddr) {
+ if (nameServerAddr.find(ENDPOINT_PREFIX) == 0) {
+ return nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH);
+ }
+ return nameServerAddr;
+}
+
+} // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/NamespaceUtil.h
similarity index 62%
rename from src/common/NameSpaceUtil.h
rename to src/common/NamespaceUtil.h
index 2e19c0a..6255d5f 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/NamespaceUtil.h
@@ -21,14 +21,20 @@
namespace rocketmq {
-static const std::string ENDPOINT_PREFIX = "http://";
-static const unsigned int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
-
-class NameSpaceUtil {
+class NamespaceUtil {
public:
- static bool isEndPointURL(std::string nameServerAddr);
+ static std::string withoutNamespace(const std::string& resourceWithNamespace);
+ static std::string withoutNamespace(const std::string& resourceWithNamespace, const std::string& name_space);
+
+ static std::string wrapNamespace(const std::string& name_space, const std::string& resourceWithoutNamespace);
+
+ static std::string withoutRetryAndDLQ(const std::string& originalResource);
+
+ static bool isSystemResource(const std::string& resource);
+
+ static bool isEndPointURL(const std::string& nameServerAddr);
- static std::string formatNameServerURL(std::string nameServerAddr);
+ static std::string formatNameServerURL(const std::string& nameServerAddr);
};
} // namespace rocketmq
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index 658695f..90dec5b 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -117,14 +117,22 @@ void UtilAll::string2bytes(char* dest, const std::string& src) {
}
}
-bool UtilAll::isRetryTopic(const std::string& topic) {
- return topic.find(RETRY_GROUP_TOPIC_PREFIX) == 0;
+bool UtilAll::isRetryTopic(const std::string& resource) {
+ return resource.find(RETRY_GROUP_TOPIC_PREFIX) == 0;
+}
+
+bool UtilAll::isDLQTopic(const std::string& resource) {
+ return resource.find(DLQ_GROUP_TOPIC_PREFIX) == 0;
}
std::string UtilAll::getRetryTopic(const std::string& consumerGroup) {
return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
}
+std::string UtilAll::getDLQTopic(const std::string& consumerGroup) {
+ return DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
+}
+
std::string UtilAll::getReplyTopic(const std::string& clusterName) {
return clusterName + "_" + REPLY_TOPIC_POSTFIX;
}
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 5193714..8ad49b0 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -97,8 +97,12 @@ class UtilAll {
static std::string bytes2string(const char* bytes, size_t len);
static void string2bytes(char* dest, const std::string& src);
- static bool isRetryTopic(const std::string& topic);
+ static bool isRetryTopic(const std::string& resource);
+ static bool isDLQTopic(const std::string& resource);
+
static std::string getRetryTopic(const std::string& consumerGroup);
+ static std::string getDLQTopic(const std::string& consumerGroup);
+
static std::string getReplyTopic(const std::string& clusterName);
static void Trim(std::string& str);