You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/01 03:24:28 UTC
[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-250]
Create C/C++ configure files
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new ca21347 [TUBEMQ-250] Create C/C++ configure files
ca21347 is described below
commit ca213474f4b0ce7d71a18ae91b4b2dfd4b20bc0f
Author: gosonzhang <go...@tencent.com>
AuthorDate: Wed Jul 1 11:24:01 2020 +0800
[TUBEMQ-250] Create C/C++ configure files
---
.../tubemq-client-cpp/inc/client_config.h | 72 +++++++++++++++++++++-
tubemq-client-twins/tubemq-client-cpp/inc/utils.h | 17 +++--
.../tubemq-client-cpp/inc/version.h | 2 +-
.../tubemq-client-cpp/src/client_config.cc | 67 ++++++++++++++++++--
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 21 ++++---
5 files changed, 151 insertions(+), 28 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h b/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h
index 71f2c0e..576a8e1 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h
@@ -23,6 +23,7 @@
#include <string>
#include <stdio.h>
#include <map>
+#include <set>
@@ -47,6 +48,16 @@ static const int kMasterAddrInfoMaxLength = 1024;
static const int kTopicNameMaxLength = 64;
// max Consume GroupName length
static const int kGroupNameMaxLength = 1024;
+// max subscribe info report times
+static const int kSubInfoReportMaxIntervalTimes = 6;
+// default message not found response wait period
+static const int kMsgNotfoundWaitPeriodMsDef = 200;
+// default confirm wait period if rebalance meeting
+static const int kRebConfirmWaitPeriodMsDef = 3000;
+// max confirm wait period anyway
+static const int kConfirmWaitPeriodMsMax = 60000;
+// default rebalance wait if shutdown meeting
+static const int kRebWaitPeriodWhenShutdownMs = 10000;
} // namespace config
@@ -97,11 +108,68 @@ class BaseConfig {
};
-class ConsumerConfig {
- public:
+enum ConsumePosition {
+ kConsumeFromFirstOffset = -1,
+ kConsumeFromLatestOffset = 0,
+ kComsumeFromMaxOffsetAlways = 1
+};
+
+
+
+class ConsumerConfig : public BaseConfig {
+ public:
ConsumerConfig();
+ ~ConsumerConfig();
+ ConsumerConfig& operator=(const ConsumerConfig& target);
+ bool SetGroupConsumeTarget(string& err_info,
+ const string& group_name, const set<string>& subscribed_topicset);
+ bool SetGroupConsumeTarget(string& err_info,
+ const string& group_name, const map<string, set<string> >& subscribed_topic_and_filter_map);
+ bool SetGroupConsumeTarget(string& err_info,
+ const string& group_name, const map<string, set<string> >& subscribed_topic_and_filter_map,
+ const string& session_key, int source_count, bool is_select_big, const map<string, long>& part_offset_map);
+ const string& GetGroupName() const;
+ const map<string, set<string> >& GetSubTopicAndFilterMap() const;
+ void SetConsumePosition(ConsumePosition consume_from_where);
+ const ConsumePosition GetConsumePosition() const;
+ const int GetMsgNotFoundWaitPeriodMs() const;
+ void SetMsgNotFoundWaitPeriodMs(int msg_notfound_wait_period_ms);
+ const int GetMaxSubinfoReportIntvl() const;
+ void SetMaxSubinfoReportIntvl(int max_subinfo_report_intvl);
+ bool IsConfirmInLocal();
+ void SetConfirmInLocal(bool confirm_in_local);
+ bool IsRollbackIfConfirmTimeout();
+ void setRollbackIfConfirmTimeout(bool is_rollback_if_confirm_timeout);
+ const int GetWaitPeriodIfConfirmWaitRebalanceMs() const;
+ void SetWaitPeriodIfConfirmWaitRebalanceMs(int reb_confirm_wait_period_ms);
+ const int GetMaxConfirmWaitPeriodMs() const;
+ void SetMaxConfirmWaitPeriodMs(int max_confirm_wait_period_ms);
+ const int GetShutdownRebWaitPeriodMs() const;
+ void SetShutdownRebWaitPeriodMs(int wait_period_when_shutdown_ms);
+ string ToString();
+
+
+ private:
+ string group_name_;
+ map<string, set<string> > sub_topic_and_filter_map_;
+ bool is_bound_consume_;
+ string session_key_;
+ int source_count_;
+ bool is_select_big_;
+ map<string, long> part_offset_map_;
+ ConsumePosition consume_position_;
+ int max_subinfo_report_intvl_;
+ int msg_notfound_wait_period_ms_;
+ bool is_confirm_in_local_;
+ bool is_rollback_if_confirm_timout_;
+ int reb_confirm_wait_period_ms_;
+ int max_confirm_wait_period_ms_;
+ int shutdown_reb_wait_period_ms_;
};
+
+
+
}
#endif
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
index 937eb37..e8df420 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
@@ -27,23 +27,22 @@ namespace tubemq {
using namespace std;
-static const string tWhitespaceCharSet = " \n\r\t\f\v";
namespace delimiter {
- static const string tDelimiterEqual = "=";
- static const string tDelimiterAnd = "&";
- static const string tDelimiterComma = ",";
- static const string tDelimiterColon = ":";
- static const string tDelimiterAt = "@";
- static const string tDelimiterPound = "#";
+ static const string kDelimiterEqual = "=";
+ static const string kDelimiterAnd = "&";
+ static const string kDelimiterComma = ",";
+ static const string kDelimiterColon = ":";
+ static const string kDelimiterAt = "@";
+ static const string kDelimiterPound = "#";
}
class Utils {
public:
// trim string info
- static string trim(const string& source);
+ static string Trim(const string& source);
// split string to vector
- static void split(const string& source, map<string, int>& result,
+ static void Split(const string& source, map<string, int>& result,
const string& delimiter_step1, const string& delimiter_step2);
};
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/version.h b/tubemq-client-twins/tubemq-client-cpp/inc/version.h
index cdf1444..e7ccc62 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/version.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/version.h
@@ -26,7 +26,7 @@ namespace tubemq {
using namespace std;
-static const String tTubeMQClientVersion = "0.5.0";
+static const String kTubeMQClientVersion = "0.1.0-0.5.0";
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
index 2f540fa..822c0e5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
@@ -62,7 +62,7 @@ BaseConfig& BaseConfig::operator=(const BaseConfig& target) {
bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrinfo) {
// check parameter masterAddrInfo
- string trimed_master_addr_info = Utils::trim(master_addrinfo);
+ string trimed_master_addr_info = Utils::Trim(master_addrinfo);
if(trimed_master_addr_info.empty()) {
err_info = "Illegal parameter: master_addrinfo is blank!";
return false;
@@ -78,8 +78,8 @@ bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrin
// parse and verify master address info
// master_addrinfo's format like ip1:port1,ip2:port2,ip3:port3
map<string, int> tgt_address_map;
- Utils::split(master_addrinfo, tgt_address_map,
- delimiter::tDelimiterComma, delimiter::tDelimiterColon);
+ Utils::Split(master_addrinfo, tgt_address_map,
+ delimiter::kDelimiterComma, delimiter::kDelimiterColon);
if(tgt_address_map.empty()) {
err_info = "Illegal parameter: master_addrinfo is blank!";
return false;
@@ -93,12 +93,12 @@ bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable,
const string& trust_store_path, const string& trust_store_password) {
this->tls_enabled_ = tls_enable;
if(tls_enable) {
- string trimed_trust_store_path = Utils::trim(trust_store_path);
+ string trimed_trust_store_path = Utils::Trim(trust_store_path);
if(trimed_trust_store_path.empty()) {
err_info = "Illegal parameter: trust_store_path is empty!";
return false;
}
- string trimed_trust_store_password = Utils::trim(trust_store_password);
+ string trimed_trust_store_password = Utils::Trim(trust_store_password);
if(trimed_trust_store_password.empty()) {
err_info = "Illegal parameter: trust_store_password is empty!";
return false;
@@ -117,7 +117,7 @@ bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable,
const string& usr_name, const string& usr_password) {
this->auth_enable_ = authentic_enable;
if(authentic_enable) {
- string trimed_usr_name = Utils::trim(usr_name);
+ string trimed_usr_name = Utils::Trim(usr_name);
if(trimed_usr_name.empty()) {
err_info = "Illegal parameter: usr_name is empty!";
return false;
@@ -232,6 +232,61 @@ string BaseConfig::ToString() {
}
+ ConsumerConfig::ConsumerConfig() {
+ this->group_name_ = "";
+ this->is_bound_consume_ = false;
+ this->session_key_ = "";
+ this->source_count_ = -1;
+ this->is_select_big_ = true;
+ this->consume_position_ = kConsumeFromLatestOffset;
+ this->is_confirm_in_local_ = false;
+ this->is_rollback_if_confirm_timout_ = true;
+ this->max_subinfo_report_intvl_ = config::kSubInfoReportMaxIntervalTimes;
+ this->msg_notfound_wait_period_ms_ = config::kMsgNotfoundWaitPeriodMsDef;
+ this->reb_confirm_wait_period_ms_ = config::kRebConfirmWaitPeriodMsDef;
+ this->max_confirm_wait_period_ms_ = config::kConfirmWaitPeriodMsMax;
+ this->shutdown_reb_wait_period_ms_ = config::kRebWaitPeriodWhenShutdownMs;
+ }
+
+ ConsumerConfig::~ConsumerConfig() {
+
+ }
+
+ConsumerConfig& ConsumerConfig::operator=(const ConsumerConfig& target) {
+ if(this != &target) {
+ // parent class
+ this->master_addrinfo_ = target.master_addrinfo_;
+ this->auth_enable_ = target.auth_enable_;
+ this->auth_usrname_ = target.auth_usrname_;
+ this->auth_usrpassword_ = target.auth_usrpassword_;
+ this->tls_enabled_ = target.tls_enabled_;
+ this->tls_trust_store_path_ = target.tls_trust_store_path_;
+ this->tls_trust_store_password_ = target.tls_trust_store_password_;
+ this->rpc_read_timeout_sec_ = target.rpc_read_timeout_sec_;
+ this->heartbeat_period_sec_ = target.heartbeat_period_sec_;
+ this->max_heartbeat_retry_times_ = target.max_heartbeat_retry_times_;
+ this->heartbeat_period_afterfail_sec_ = target.heartbeat_period_afterfail_sec_;
+ // child class
+ this->group_name_ = target.group_name_;
+ this->sub_topic_and_filter_map_ = target.sub_topic_and_filter_map_;
+ this->is_bound_consume_ = target.is_bound_consume_;
+ this->session_key_ = target.session_key_;
+ this->source_count_ = target.source_count_;
+ this->is_select_big_ = target.is_select_big_;
+ this->part_offset_map_ = target.part_offset_map_;
+ this->consume_position_ = target.consume_position_;
+ this->max_subinfo_report_intvl_ = target.max_subinfo_report_intvl_;
+ this->msg_notfound_wait_period_ms_ = target.msg_notfound_wait_period_ms_;
+ this->is_confirm_in_local_ = target.is_confirm_in_local_;
+ this->is_rollback_if_confirm_timout_ = target.is_rollback_if_confirm_timout_;
+ this->reb_confirm_wait_period_ms_ = target.reb_confirm_wait_period_ms_;
+ this->max_confirm_wait_period_ms_ = target.max_confirm_wait_period_ms_;
+ this->shutdown_reb_wait_period_ms_ = target.shutdown_reb_wait_period_ms_;
+
+ }
+ return *this;
+}
+
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index c3d1ca7..8b2d08f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -23,15 +23,16 @@
namespace tubemq {
+static const string kWhitespaceCharSet = " \n\r\t\f\v";
-string Utils::trim(const string& source) {
+string Utils::Trim(const string& source) {
string target = source;
if(!target.empty()) {
- size_t foud_pos = target.find_first_not_of(tWhitespaceCharSet);
+ size_t foud_pos = target.find_first_not_of(kWhitespaceCharSet);
if (foud_pos != string::npos) {
target = target.substr(foud_pos);
}
- foud_pos = target.find_last_not_of(tWhitespaceCharSet);
+ foud_pos = target.find_last_not_of(kWhitespaceCharSet);
if(foud_pos != string::npos) {
target = target.substr(0, foud_pos + 1);
}
@@ -39,7 +40,7 @@ string Utils::trim(const string& source) {
return target;
}
-void Utils::split(const string& source, map<string, int>& result,
+void Utils::Split(const string& source, map<string, int>& result,
const string& delimiter_step1, const string& delimiter_step2) {
string item_str;
string key_str;
@@ -50,7 +51,7 @@ void Utils::split(const string& source, map<string, int>& result,
pos2 = source.find(delimiter_step1);
while(string::npos != pos2) {
item_str = source.substr(pos1, pos2-pos1);
- item_str = Utils::trim(item_str);
+ item_str = Utils::Trim(item_str);
pos1 = pos2 + delimiter_step1.length();
pos2 = source.find(delimiter_step1, pos1);
if(item_str.empty()) {
@@ -62,8 +63,8 @@ void Utils::split(const string& source, map<string, int>& result,
}
key_str = item_str.substr(0, pos3);
val_str = item_str.substr(pos3+delimiter_step2.length());
- key_str = Utils::trim(key_str);
- val_str = Utils::trim(val_str);
+ key_str = Utils::Trim(key_str);
+ val_str = Utils::Trim(val_str);
if(key_str.empty()) {
continue;
}
@@ -71,13 +72,13 @@ void Utils::split(const string& source, map<string, int>& result,
}
if(pos1 != source.length()) {
item_str = source.substr(pos1);
- item_str = Utils::trim(item_str);
+ item_str = Utils::Trim(item_str);
pos3 = item_str.find(delimiter_step2);
if(string::npos != pos3) {
key_str = item_str.substr(0, pos3);
val_str = item_str.substr(pos3+delimiter_step2.length());
- key_str = Utils::trim(key_str);
- val_str = Utils::trim(val_str);
+ key_str = Utils::Trim(key_str);
+ val_str = Utils::Trim(val_str);
if(!key_str.empty()){
result[key_str] = atoi(val_str.c_str());
}