You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/16 06:08:57 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-282]Create C/C++ return result class (#207)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 5d4e376  [TUBEMQ-282]Create C/C++ return result class (#207)
5d4e376 is described below

commit 5d4e37647a083f8505c33f9dfae894e4f86f6021
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jul 16 06:08:42 2020 +0000

    [TUBEMQ-282]Create C/C++ return result class (#207)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../include/tubemq/const_config.h                  |  22 ++--
 .../include/tubemq/executor_pool.h                 |   3 +-
 .../include/tubemq/rmt_data_cache.h                |   3 +-
 .../include/tubemq/tubemq_return.h                 |  95 ++++++++++++++
 .../tubemq-client-cpp/include/tubemq/version.h     |   2 +-
 .../tubemq-client-cpp/src/client_subinfo.cc        |   4 +-
 .../tubemq-client-cpp/src/file_ini.cc              |  14 +-
 .../tubemq-client-cpp/src/message.cc               |   4 +-
 .../tubemq-client-cpp/src/meta_info.cc             |   2 +-
 .../tubemq-client-cpp/src/rmt_data_cache.cc        |  19 ++-
 .../tubemq-client-cpp/src/tubemq_return.cc         | 143 +++++++++++++++++++++
 tubemq-client-twins/tubemq-client-cpp/src/utils.cc |   2 +-
 12 files changed, 276 insertions(+), 37 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h
index bc90194..d843929 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h
@@ -76,20 +76,20 @@ static const int32_t kInvalidValue = -2;
 }  // namespace tb_config
 
 namespace delimiter {
-static const string kDelimiterDot = ".";
-static const string kDelimiterEqual = "=";
-static const string kDelimiterAnd = "&";
-static const string kDelimiterComma = ",";
-static const string kDelimiterColon = ":";
-static const string kDelimiterAt = "@";
-static const string kDelimiterPound = "#";
-static const string kDelimiterSemicolon = ";";
+static const char kDelimiterDot[] = ".";
+static const char kDelimiterEqual[] = "=";
+static const char kDelimiterAnd[] = "&";
+static const char kDelimiterComma[] = ",";
+static const char kDelimiterColon[] = ":";
+static const char kDelimiterAt[] = "@";
+static const char kDelimiterPound[] = "#";
+static const char kDelimiterSemicolon[] = ";";
 // Double slash
-static const string kDelimiterDbSlash = "//";
+static const char kDelimiterDbSlash[] = "//";
 // left square bracket
-static const string kDelimiterLftSB = "[";
+static const char kDelimiterLftSB[] = "[";
 // right square bracket
-static const string kDelimiterRgtSB = "]";
+static const char kDelimiterRgtSB[] = "]";
 
 }  // namespace delimiter
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
index a5208a4..404616b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
@@ -85,6 +85,7 @@ class ExecutorPool : noncopyable {
 };
 
 typedef std::shared_ptr<ExecutorPool> ExecutorPoolPtr;
+
 }  // namespace tubemq
 
-#endif  //_TUBEMQ_EXECUTOR_POOL_
+#endif  // _TUBEMQ_EXECUTOR_POOL_
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index 2b79f13..aa8f250 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -48,7 +48,7 @@ class RmtDataCacheCsm {
  public:
   RmtDataCacheCsm(const string& client_id, const string& group_name);
   ~RmtDataCacheCsm();
-  void UpdateDefFlowCtrlInfo(int64_t flowctrl_id, 
+  void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
                                      const string& flowctrl_info);
   void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
                  int64_t flowctrl_id, const string& flowctrl_info);
@@ -109,7 +109,6 @@ class RmtDataCacheCsm {
   FlowCtrlRuleHandler def_flowctrl_handler_;
   AtomicBoolean under_groupctrl_;
   AtomicLong last_checktime_;
-
   // meta info
   pthread_rwlock_t meta_rw_lock_;
   // partiton allocated map
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
new file mode 100644
index 0000000..96194b4
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_RETURN_H_
+#define TUBEMQ_CLIENT_RETURN_H_
+
+#include <stdlib.h>
+
+#include <list>
+#include <string>
+
+#include "tubemq/message.h"
+#include "tubemq/meta_info.h"
+
+
+
+
+namespace tubemq {
+
+using std::string;
+
+
+class PeerInfo {
+ public:
+  PeerInfo();
+  PeerInfo(const Partition& partition, int64_t offset);
+  PeerInfo& operator=(const PeerInfo& target);
+  void SetMsgSourceInfo(const Partition& partition, int64_t offset);
+  const uint32_t GetPartitionId() const { return partition_id_; }
+  const string& GetBrokerHost() const { return broker_host_; }
+  const string& GetPartitionKey() const { return partition_key_; }
+  const int64_t GetCurrOffset() const { return curr_offset_; }
+
+ private:
+  uint32_t partition_id_;
+  string broker_host_;
+  string partition_key_;
+  int64_t curr_offset_;
+};
+
+
+
+class ConsumerResult {
+ public:
+  ConsumerResult();
+  ConsumerResult(const ConsumerResult& target);
+  ConsumerResult(int32_t err_code, string err_msg);
+  ~ConsumerResult();
+  ConsumerResult& operator=(const ConsumerResult& target);
+  void SetFailureResult(int32_t err_code, string err_msg);
+  void SetFailureResult(int32_t err_code, string err_msg,
+              const string& topic_name, const PeerInfo& peer_info);
+  void SetSuccessResult(int32_t err_code, const string& topic_name,
+                  const PeerInfo& peer_info, const string& confirm_context,
+                  const list<Message>& message_list);
+  bool IsSuccess() { return success_; }
+  const int32_t  GetErrCode() const { return err_code_; }
+  const string& GetErrMessage() const { return err_msg_; }
+  const string& GetTopicName() const { return topic_name_; }
+  const PeerInfo& GetPeerInfo() const { return peer_info_; }
+  const string& GetConfirmContext() const { return confirm_context_; }
+  const list<Message>& GetMessageList() const { return message_list_; }
+  const string& GetPartitionKey() const;
+  const int64_t GetCurrOffset() const;
+
+ private:
+  bool success_;
+  int32_t  err_code_;
+  string err_msg_;
+  string topic_name_;
+  PeerInfo peer_info_;
+  string confirm_context_;
+  list<Message> message_list_;
+};
+
+}
+
+#endif  // TUBEMQ_CLIENT_RETURN_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h
index 4aede43..c479ede 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h
@@ -26,7 +26,7 @@ namespace tubemq {
 
 using std::string;
 
-static const string kTubeMQClientVersion = "0.1.0-0.5.0";
+static const char kTubeMQClientVersion[] = "0.1.0-0.5.0";
 
 }  // namespace tubemq
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
index 0c8064c..9e959e6 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -62,7 +62,7 @@ void ClientSubInfo::SetConsumeTarget(bool bound_consume,
     } else {
       topic_filter_map_[it_topic->first] = true;
 
-      //build topic conditions
+      // build topic conditions
       count = 0;
       tmpstr = it_topic->first;
       tmpstr += delimiter::kDelimiterPound;
@@ -77,7 +77,7 @@ void ClientSubInfo::SetConsumeTarget(bool bound_consume,
     }
   }
 
-  //build bound_partition info
+  // build bound_partition info
   if (bound_consume) {
     session_key_ = session_key;
     source_count_ = source_count;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
index 346dc10..b073cde 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
@@ -59,6 +59,9 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
   string sector = "";
   string key = "";
   string value = "";
+  string lftsb = delimiter::kDelimiterLftSB;
+  string rgtsb = delimiter::kDelimiterRgtSB;
+  string equal = delimiter::kDelimiterEqual;
   string::size_type lftsb_pos = 0;
   string::size_type rgtsb_pos = 0;
   string::size_type equal_pos = 0;
@@ -71,21 +74,20 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
       continue;
     }
     // check if a sector head
-    lftsb_pos = line_str.find(delimiter::kDelimiterLftSB);
-    rgtsb_pos = line_str.find(delimiter::kDelimiterRgtSB);
+    lftsb_pos = line_str.find(lftsb);
+    rgtsb_pos = line_str.find(rgtsb);
     if (lftsb_pos != string::npos && rgtsb_pos != string::npos) {
-      sector = line_str.substr(lftsb_pos + (delimiter::kDelimiterLftSB).size(),
-                               rgtsb_pos - (delimiter::kDelimiterRgtSB).size());
+      sector = line_str.substr(lftsb_pos + lftsb.size(), rgtsb_pos - rgtsb.size());
       sector = Utils::Trim(sector);
       continue;
     }
     // check if a key=value string
-    equal_pos = line_str.find(delimiter::kDelimiterEqual);
+    equal_pos = line_str.find(equal);
     if (equal_pos == string::npos) {
       continue;
     }
     key = line_str.substr(0, equal_pos);
-    value = line_str.substr(equal_pos + (delimiter::kDelimiterEqual).size(), line_str.size());
+    value = line_str.substr(equal_pos + equal.size(), line_str.size());
     key = Utils::Trim(key);
     value = Utils::Trim(value);
     // get data from file to memory
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
index 9d117cd..93e3dd2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
@@ -34,9 +34,9 @@ using std::stringstream;
 // message flag's properties settings
 static const int32_t kMsgFlagIncProperties = 0x01;
 // reserved property key Filter Item
-static const string kRsvPropKeyFilterItem = "$msgType$";
+static const char kRsvPropKeyFilterItem[] = "$msgType$";
 // reserved property key message send time
-static const string kRsvPropKeyMsgTime = "$msgTime$";
+static const char kRsvPropKeyMsgTime[] = "$msgTime$";
 
 Message::Message() {
   this->topic_ = "";
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 7694c5b..ae7d72e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -265,7 +265,7 @@ PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
   if (this != &target) {
     // parent class
     Partition::operator=(target);
-    // child class    
+    // child class
     this->is_last_consumed_ = target.is_last_consumed_;
     this->cur_flowctrl_ = target.cur_flowctrl_;
     this->cur_freqctrl_ = target.cur_freqctrl_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index 2164fa9..ec4876b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -19,9 +19,8 @@
 
 #include "tubemq/rmt_data_cache.h"
 
-#include <string>
-
 #include <stdlib.h>
+#include <string>
 
 #include "tubemq/const_config.h"
 #include "tubemq/meta_info.h"
@@ -57,24 +56,24 @@ RmtDataCacheCsm::~RmtDataCacheCsm() {
   pthread_rwlock_destroy(&meta_rw_lock_);
 }
 
+
 void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
                                                  const string& flowctrl_info) {
   if (flowctrl_id != def_flowctrl_handler_.GetFlowCtrlId()) {
-    def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true, 
+    def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true,
       tb_config::kInvalidValue, flowctrl_id, flowctrl_info);
   }
 }
 void RmtDataCacheCsm::UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
                              int64_t flowctrl_id, const string& flowctrl_info) {
   if (flowctrl_id != group_flowctrl_handler_.GetFlowCtrlId()) {
-    group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false, 
+    group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false,
                 qyrpriority_id, flowctrl_id, flowctrl_info);
   }
   if (qyrpriority_id != group_flowctrl_handler_.GetQryPriorityId()) {
     this->group_flowctrl_handler_.SetQryPriorityId(qyrpriority_id);
-
   }
-  // update current if under group flowctrl 
+  // update current if under group flowctrl
   int64_t cur_time = Utils::GetCurrentTimeMillis();
   if (cur_time - last_checktime_.Get() > 10000) {
     FlowCtrlResult flowctrl_result;
@@ -176,9 +175,9 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
   return result;
 }
 
-void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, 
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
                      int64_t curr_offset, int32_t err_code, bool esc_limit,
-                     int32_t msg_size,int64_t limit_dlt, int64_t cur_data_dlt,
+                  int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt,
                      bool require_slow) {
   map<string, PartitionExt>::iterator it_part;
   // book partition offset info
@@ -287,7 +286,7 @@ void RmtDataCacheCsm::GetAllBrokerPartitions(
 bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& partition_ext) {
   bool result = false;
   map<string, PartitionExt>::iterator it_map;
-  
+
   pthread_rwlock_rdlock(&meta_rw_lock_);
   it_map = partitions_.find(part_key);
   if (it_map != partitions_.end()) {
@@ -314,7 +313,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
   set<string>::iterator it_key;
   map<NodeInfo, set<string> >::iterator it_broker;
   map<string, PartitionExt>::iterator it_part;
-  
+
   partition_list.clear();
   pthread_rwlock_rdlock(&meta_rw_lock_);
   it_broker = broker_partition_.find(broker_info);
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
new file mode 100644
index 0000000..5298606
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/tubemq_return.h"
+#include "tubemq/const_config.h"
+
+
+
+namespace tubemq {
+
+
+PeerInfo::PeerInfo() {
+  broker_host_ = "";
+  partition_key_ = "";
+  partition_id_ = 0;
+  curr_offset_ = tb_config::kInvalidValue;
+}
+
+PeerInfo::PeerInfo(const Partition& partition, int64_t offset) {
+  SetMsgSourceInfo(partition, offset);
+}
+
+PeerInfo& PeerInfo::operator=(const PeerInfo& target) {
+  if (this != &target) {
+    this->partition_id_ = target.partition_id_;
+    this->broker_host_ = target.broker_host_;
+    this->partition_key_ = target.partition_key_;
+    this->curr_offset_ = target.curr_offset_;
+  }
+  return *this;
+}
+
+void PeerInfo::SetMsgSourceInfo(const Partition& partition, int64_t offset) {
+  partition_id_ = partition.GetPartitionId();
+  broker_host_ = partition.GetBrokerHost();
+  partition_key_ = partition.GetPartitionKey();
+  curr_offset_ = offset;
+}
+
+ConsumerResult::ConsumerResult() {
+  success_ = false;
+  err_code_ = tb_config::kInvalidValue;
+  err_msg_ = "";
+  topic_name_ = "";
+  confirm_context_ = "";
+}
+
+ConsumerResult::ConsumerResult(const ConsumerResult& target) {
+  this->success_ = target.success_;
+  this->err_code_ = target.err_code_;
+  this->err_msg_ = target.err_msg_;
+  this->topic_name_ = target.topic_name_;
+  this->peer_info_ = target.peer_info_;
+  this->confirm_context_ = target.confirm_context_;
+  this->message_list_ = target.message_list_;
+}
+
+ConsumerResult::ConsumerResult(int32_t err_code, string err_msg) {
+  success_ = false;
+  err_code_ = err_code;
+  err_msg_ = err_msg;
+  topic_name_ = "";
+  confirm_context_ = "";
+}
+
+ConsumerResult::~ConsumerResult() {
+  this->message_list_.clear();
+  success_ = false;
+  err_code_ = tb_config::kInvalidValue;
+  err_msg_ = "";
+  topic_name_ = "";
+  confirm_context_ = "";
+}
+
+ConsumerResult& ConsumerResult::operator=(const ConsumerResult& target) {
+  if (this != &target) {
+    this->success_ = target.success_;
+    this->err_code_ = target.err_code_;
+    this->err_msg_ = target.err_msg_;
+    this->topic_name_ = target.topic_name_;
+    this->peer_info_ = target.peer_info_;
+    this->confirm_context_ = target.confirm_context_;
+    this->message_list_ = target.message_list_;
+  }
+  return *this;
+}
+
+void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg) {
+  success_ = false;
+  err_code_ = err_code;
+  err_msg_ = err_msg;
+}
+
+void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg,
+                            const string& topic_name, const PeerInfo& peer_info) {
+  success_ = false;
+  err_code_ = err_code;
+  err_msg_ = err_msg;
+  topic_name_ = topic_name;
+  peer_info_ = peer_info;
+}
+
+void ConsumerResult::SetSuccessResult(int32_t err_code,
+                                             const string& topic_name,
+                                             const PeerInfo& peer_info,
+                                             const string& confirm_context,
+                                             const list<Message>& message_list) {
+  this->success_ = true;
+  this->err_code_ = err_code;
+  this->err_msg_ = "Ok";
+  this->topic_name_ = topic_name;
+  this->peer_info_ = peer_info;
+  this->confirm_context_ = confirm_context;
+  this->message_list_ = message_list;
+}
+
+const string& ConsumerResult::GetPartitionKey() const {
+  return this->peer_info_.GetPartitionKey();
+}
+
+const int64_t ConsumerResult::GetCurrOffset() const {
+  return this->peer_info_.GetCurrOffset();
+}
+
+
+}  // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index e48c8b6..40d1de5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -34,7 +34,7 @@ namespace tubemq {
 using std::stringstream;
 
 
-static const string kWhitespaceCharSet = " \n\r\t\f\v";
+static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
 
 string Utils::Trim(const string& source) {
   string target = source;