You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:34 UTC

[rocketmq-client-cpp] 17/29: refactor: NamespaceUtil

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 505ea47e46439cbb273bd7dc38e4bda895a4265f
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Sep 21 15:32:28 2020 +0800

    refactor: NamespaceUtil
---
 include/MQClientConfig.h                        |   5 +-
 include/MQClientConfigProxy.h                   |   5 +-
 src/MQClientConfigImpl.hpp                      |  10 +-
 src/common/NameSpaceUtil.cpp                    |  40 --------
 src/common/NamespaceUtil.cpp                    | 119 ++++++++++++++++++++++++
 src/common/{NameSpaceUtil.h => NamespaceUtil.h} |  18 ++--
 src/common/UtilAll.cpp                          |  12 ++-
 src/common/UtilAll.h                            |   6 +-
 8 files changed, 161 insertions(+), 54 deletions(-)

diff --git a/include/MQClientConfig.h b/include/MQClientConfig.h
index ade13f0..645fc05 100644
--- a/include/MQClientConfig.h
+++ b/include/MQClientConfig.h
@@ -50,7 +50,10 @@ class ROCKETMQCLIENT_API MQClientConfig {
   virtual void set_instance_name(const std::string& instanceName) = 0;
 
   virtual const std::string& unit_name() const = 0;
-  virtual void set_unit_name(std::string unitName) = 0;
+  virtual void set_unit_name(const std::string& unitName) = 0;
+
+  virtual const std::string& name_space() const = 0;
+  virtual void set_name_space(const std::string& name_space) = 0;
 
   /**
    * the num of threads to distribute network data
diff --git a/include/MQClientConfigProxy.h b/include/MQClientConfigProxy.h
index 8cea1bb..981fc7f 100644
--- a/include/MQClientConfigProxy.h
+++ b/include/MQClientConfigProxy.h
@@ -44,7 +44,10 @@ class ROCKETMQCLIENT_API MQClientConfigProxy : virtual public MQClientConfig  //
   void set_instance_name(const std::string& instanceName) override { client_config_->set_instance_name(instanceName); }
 
   const std::string& unit_name() const override { return client_config_->unit_name(); }
-  void set_unit_name(std::string unitName) override { client_config_->set_unit_name(unitName); }
+  void set_unit_name(const std::string& unitName) override { client_config_->set_unit_name(unitName); }
+
+  const std::string& name_space() const override { return client_config_->name_space(); }
+  void set_name_space(const std::string& name_space) override { client_config_->set_name_space(name_space); }
 
   int tcp_transport_worker_thread_nums() const override { return client_config_->tcp_transport_worker_thread_nums(); }
   void set_tcp_transport_worker_thread_nums(int num) override {
diff --git a/src/MQClientConfigImpl.hpp b/src/MQClientConfigImpl.hpp
index c0aa341..59b4601 100644
--- a/src/MQClientConfigImpl.hpp
+++ b/src/MQClientConfigImpl.hpp
@@ -21,7 +21,7 @@
 #include <thread>     // std::thread::hardware_concurrency
 
 #include "MQClientConfig.h"
-#include "NameSpaceUtil.h"
+#include "NamespaceUtil.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -67,14 +67,17 @@ class MQClientConfigImpl : virtual public MQClientConfig {
 
   const std::string& namesrv_addr() const override { return namesrv_addr_; }
   void set_namesrv_addr(const std::string& namesrvAddr) override {
-    namesrv_addr_ = NameSpaceUtil::formatNameServerURL(namesrvAddr);
+    namesrv_addr_ = NamespaceUtil::formatNameServerURL(namesrvAddr);
   }
 
   const std::string& instance_name() const override { return instance_name_; }
   void set_instance_name(const std::string& instanceName) override { instance_name_ = instanceName; }
 
   const std::string& unit_name() const override { return unit_name_; }
-  void set_unit_name(std::string unitName) override { unit_name_ = unitName; }
+  void set_unit_name(const std::string& unitName) override { unit_name_ = unitName; }
+
+  const std::string& name_space() const override { return name_space_; }
+  void set_name_space(const std::string& name_space) override { name_space_ = name_space; }
 
   int tcp_transport_worker_thread_nums() const override { return tcp_worker_thread_nums_; }
   void set_tcp_transport_worker_thread_nums(int num) override {
@@ -96,6 +99,7 @@ class MQClientConfigImpl : virtual public MQClientConfig {
   std::string instance_name_;
   std::string group_name_;
   std::string unit_name_;
+  std::string name_space_;
 
   int tcp_worker_thread_nums_;
   uint64_t tcp_connect_timeout;              // ms
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
deleted file mode 100644
index f7ceec7..0000000
--- a/src/common/NameSpaceUtil.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "NameSpaceUtil.h"
-
-#include "Logging.h"
-
-namespace rocketmq {
-
-bool NameSpaceUtil::isEndPointURL(std::string nameServerAddr) {
-  if (nameServerAddr.length() >= ENDPOINT_PREFIX_LENGTH && nameServerAddr.find(ENDPOINT_PREFIX) != std::string::npos) {
-    return true;
-  }
-  return false;
-}
-
-std::string NameSpaceUtil::formatNameServerURL(std::string nameServerAddr) {
-  auto index = nameServerAddr.find(ENDPOINT_PREFIX);
-  if (index != std::string::npos) {
-    LOG_DEBUG("Get Name Server from endpoint [%s]",
-              nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH, nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH).c_str());
-    return nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH, nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH);
-  }
-  return nameServerAddr;
-}
-
-}  // namespace rocketmq
diff --git a/src/common/NamespaceUtil.cpp b/src/common/NamespaceUtil.cpp
new file mode 100644
index 0000000..96391c5
--- /dev/null
+++ b/src/common/NamespaceUtil.cpp
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NamespaceUtil.h"
+
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+static const char NAMESPACE_SEPARATOR = '%';
+static const std::string STRING_BLANK = "";
+static const size_t RETRY_PREFIX_LENGTH = RETRY_GROUP_TOPIC_PREFIX.length();
+static const size_t DLQ_PREFIX_LENGTH = DLQ_GROUP_TOPIC_PREFIX.length();
+
+static const std::string ENDPOINT_PREFIX = "http://";
+static const size_t ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+
+std::string NamespaceUtil::withoutNamespace(const std::string& resourceWithNamespace) {
+  if (resourceWithNamespace.empty() || isSystemResource(resourceWithNamespace)) {
+    return resourceWithNamespace;
+  }
+
+  auto resourceWithoutRetryAndDLQ = withoutRetryAndDLQ(resourceWithNamespace);
+  auto index = resourceWithoutRetryAndDLQ.find(NAMESPACE_SEPARATOR);
+  if (index > 0) {
+    auto resourceWithoutNamespace = resourceWithoutRetryAndDLQ.substr(index + 1);
+    if (UtilAll::isRetryTopic(resourceWithNamespace)) {
+      return UtilAll::getRetryTopic(resourceWithoutNamespace);
+    } else if (UtilAll::isDLQTopic(resourceWithNamespace)) {
+      return UtilAll::getDLQTopic(resourceWithoutNamespace);
+    } else {
+      return resourceWithoutNamespace;
+    }
+  }
+
+  return resourceWithNamespace;
+}
+
+std::string NamespaceUtil::withoutNamespace(const std::string& resourceWithNamespace, const std::string& name_space) {
+  if (resourceWithNamespace.empty() || name_space.empty()) {
+    return resourceWithNamespace;
+  }
+
+  auto resourceWithoutRetryAndDLQ = withoutRetryAndDLQ(resourceWithNamespace);
+  if (resourceWithoutRetryAndDLQ.find(name_space + NAMESPACE_SEPARATOR) == 0) {
+    return withoutNamespace(resourceWithNamespace);
+  }
+
+  return resourceWithNamespace;
+}
+
+std::string NamespaceUtil::wrapNamespace(const std::string& name_space, const std::string& resourceWithoutNamespace) {
+  if (name_space.empty() || resourceWithoutNamespace.empty()) {
+    return resourceWithoutNamespace;
+  }
+
+  // if (isSystemResource(resourceWithoutNamespace) || isAlreadyWithNamespace(resourceWithoutNamespace, namespace)) {
+  //   return resourceWithoutNamespace;
+  // }
+
+  auto resourceWithoutRetryAndDLQ = withoutRetryAndDLQ(resourceWithoutNamespace);
+
+  std::string resourceWithNamespace;
+
+  if (UtilAll::isRetryTopic(resourceWithoutNamespace)) {
+    resourceWithNamespace.append(RETRY_GROUP_TOPIC_PREFIX);
+  }
+
+  if (UtilAll::isDLQTopic(resourceWithoutNamespace)) {
+    resourceWithNamespace.append(DLQ_GROUP_TOPIC_PREFIX);
+  }
+
+  resourceWithNamespace.append(name_space);
+  resourceWithNamespace.push_back(NAMESPACE_SEPARATOR);
+  resourceWithNamespace.append(resourceWithoutRetryAndDLQ);
+
+  return resourceWithNamespace;
+}
+
+std::string NamespaceUtil::withoutRetryAndDLQ(const std::string& originalResource) {
+  if (UtilAll::isRetryTopic(originalResource)) {
+    return originalResource.substr(RETRY_PREFIX_LENGTH);
+  } else if (UtilAll::isDLQTopic(originalResource)) {
+    return originalResource.substr(DLQ_PREFIX_LENGTH);
+  } else {
+    return originalResource;
+  }
+}
+
+bool NamespaceUtil::isSystemResource(const std::string& resource) {
+  return false;
+}
+
+bool NamespaceUtil::isEndPointURL(const std::string& nameServerAddr) {
+  return nameServerAddr.find(ENDPOINT_PREFIX) == 0;
+}
+
+std::string NamespaceUtil::formatNameServerURL(const std::string& nameServerAddr) {
+  if (nameServerAddr.find(ENDPOINT_PREFIX) == 0) {
+    return nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH);
+  }
+  return nameServerAddr;
+}
+
+}  // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/NamespaceUtil.h
similarity index 62%
rename from src/common/NameSpaceUtil.h
rename to src/common/NamespaceUtil.h
index 2e19c0a..6255d5f 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/NamespaceUtil.h
@@ -21,14 +21,20 @@
 
 namespace rocketmq {
 
-static const std::string ENDPOINT_PREFIX = "http://";
-static const unsigned int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
-
-class NameSpaceUtil {
+class NamespaceUtil {
  public:
-  static bool isEndPointURL(std::string nameServerAddr);
+  static std::string withoutNamespace(const std::string& resourceWithNamespace);
+  static std::string withoutNamespace(const std::string& resourceWithNamespace, const std::string& name_space);
+
+  static std::string wrapNamespace(const std::string& name_space, const std::string& resourceWithoutNamespace);
+
+  static std::string withoutRetryAndDLQ(const std::string& originalResource);
+
+  static bool isSystemResource(const std::string& resource);
+
+  static bool isEndPointURL(const std::string& nameServerAddr);
 
-  static std::string formatNameServerURL(std::string nameServerAddr);
+  static std::string formatNameServerURL(const std::string& nameServerAddr);
 };
 
 }  // namespace rocketmq
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index 658695f..90dec5b 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -117,14 +117,22 @@ void UtilAll::string2bytes(char* dest, const std::string& src) {
   }
 }
 
-bool UtilAll::isRetryTopic(const std::string& topic) {
-  return topic.find(RETRY_GROUP_TOPIC_PREFIX) == 0;
+bool UtilAll::isRetryTopic(const std::string& resource) {
+  return resource.find(RETRY_GROUP_TOPIC_PREFIX) == 0;
+}
+
+bool UtilAll::isDLQTopic(const std::string& resource) {
+  return resource.find(DLQ_GROUP_TOPIC_PREFIX) == 0;
 }
 
 std::string UtilAll::getRetryTopic(const std::string& consumerGroup) {
   return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
 }
 
+std::string UtilAll::getDLQTopic(const std::string& consumerGroup) {
+  return DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
+}
+
 std::string UtilAll::getReplyTopic(const std::string& clusterName) {
   return clusterName + "_" + REPLY_TOPIC_POSTFIX;
 }
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 5193714..8ad49b0 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -97,8 +97,12 @@ class UtilAll {
   static std::string bytes2string(const char* bytes, size_t len);
   static void string2bytes(char* dest, const std::string& src);
 
-  static bool isRetryTopic(const std::string& topic);
+  static bool isRetryTopic(const std::string& resource);
+  static bool isDLQTopic(const std::string& resource);
+
   static std::string getRetryTopic(const std::string& consumerGroup);
+  static std::string getDLQTopic(const std::string& consumerGroup);
+
   static std::string getReplyTopic(const std::string& clusterName);
 
   static void Trim(std::string& str);