You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/16 14:28:38 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-283]Adjust C/C++ some file names: add "tubemq_" prefix (#208)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new 28f48e0 [TUBEMQ-283]Adjust C/C++ some file names: add "tubemq_" prefix (#208)
28f48e0 is described below
commit 28f48e0d0e3d08344f3ad4a3e92379a056150203
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jul 16 14:28:24 2020 +0000
[TUBEMQ-283]Adjust C/C++ some file names: add "tubemq_" prefix (#208)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/rmt_data_cache.h | 20 ++++--
.../tubemq/{client_config.h => tubemq_config.h} | 1 +
.../tubemq/{const_errcode.h => tubemq_errcode.h} | 0
.../include/tubemq/{message.h => tubemq_message.h} | 0
.../include/tubemq/tubemq_return.h | 2 +-
.../tubemq-client-cpp/include/tubemq/utils.h | 2 +-
.../tubemq-client-cpp/src/meta_info.cc | 2 +-
.../tubemq-client-cpp/src/rmt_data_cache.cc | 84 ++++++++++++++--------
.../src/{client_config.cc => tubemq_config.cc} | 4 +-
.../src/{message.cc => tubemq_message.cc} | 2 +-
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 2 +-
11 files changed, 76 insertions(+), 43 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index aa8f250..af12ce4 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -27,10 +27,13 @@
#include <map>
#include <set>
#include <string>
+#include <tuple>
#include "tubemq/atomic_def.h"
#include "tubemq/flowctrl_def.h"
#include "tubemq/meta_info.h"
+#include "tubemq/executor_pool.h"
+#include "tubemq/tubemq_errcode.h"
@@ -41,13 +44,17 @@ namespace tubemq {
using std::map;
using std::set;
using std::list;
+using std::string;
+using std::tuple;
+
// consumer remote data cache
class RmtDataCacheCsm {
public:
- RmtDataCacheCsm(const string& client_id, const string& group_name);
+ RmtDataCacheCsm();
~RmtDataCacheCsm();
+ void SetConsumerInfo(const string& client_id, const string& group_name);
void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
const string& flowctrl_info);
void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
@@ -87,8 +94,11 @@ class RmtDataCacheCsm {
void ClearEvent();
void OfferEventResult(const ConsumerEvent& event);
bool PollEventResult(ConsumerEvent& event);
+ void HandleTimeout(const string partition_key, const asio::error_code& error);
private:
+ void addDelayTimer(const string& part_key, int64_t delay_time);
+ void resetIdlePartition(const string& partition_key, bool need_reuse);
void rmvMetaInfo(const string& partition_key);
void buildConfirmContext(const string& partition_key,
int64_t booked_time, string& confirm_context);
@@ -99,9 +109,9 @@ class RmtDataCacheCsm {
private:
- // timer begin
-
- // timer end
+ // timer executor
+ ExecutorPtr executor_;
+ //
string consumer_id_;
string group_name_;
// flow ctrl
@@ -125,7 +135,7 @@ class RmtDataCacheCsm {
// for partition used map
map<string, int64_t> partition_useds_;
// for partiton timer map
- map<string, int64_t> partition_timeouts_;
+ map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
// data book
pthread_mutex_t data_book_mutex_;
// for partition offset cache
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
similarity index 99%
rename from tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
rename to tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
index fecbeb2..df3f09c 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
@@ -33,6 +33,7 @@ using std::map;
using std::set;
using std::string;
+
class BaseConfig {
public:
BaseConfig();
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_errcode.h
similarity index 100%
rename from tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h
rename to tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_errcode.h
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_message.h
similarity index 100%
rename from tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
rename to tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_message.h
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
index 96194b4..2f35b22 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
@@ -25,7 +25,7 @@
#include <list>
#include <string>
-#include "tubemq/message.h"
+#include "tubemq/tubemq_message.h"
#include "tubemq/meta_info.h"
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
index b4e0aeb..a25832f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
@@ -39,7 +39,7 @@ class Utils {
// split string to vector
static void Split(const string& source, vector<string>& result, const string& delimiter);
// split string to map<string, int>
- static void Split(const string& source, map<string, int>& result, const string& delimiter_step1,
+ static void Split(const string& source, map<string, int32_t>& result, const string& delimiter_step1,
const string& delimiter_step2);
static void Join(const vector<string>& vec, const string& delimiter, string& target);
static bool ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match,
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index ae7d72e..55e8cdc 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -25,7 +25,7 @@
#include <vector>
#include "tubemq/const_config.h"
-#include "tubemq/const_errcode.h"
+#include "tubemq/tubemq_errcode.h"
#include "tubemq/utils.h"
namespace tubemq {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index ec4876b..8b1e76b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -33,10 +33,7 @@ namespace tubemq {
-RmtDataCacheCsm::RmtDataCacheCsm(const string& client_id,
- const string& group_name) {
- consumer_id_ = client_id;
- group_name_ = group_name;
+RmtDataCacheCsm::RmtDataCacheCsm() {
under_groupctrl_.Set(false);
last_checktime_.Set(0);
pthread_rwlock_init(&meta_rw_lock_, NULL);
@@ -56,6 +53,11 @@ RmtDataCacheCsm::~RmtDataCacheCsm() {
pthread_rwlock_destroy(&meta_rw_lock_);
}
+void RmtDataCacheCsm::SetConsumerInfo(const string& client_id,
+ const string& group_name) {
+ consumer_id_ = client_id;
+ group_name_ = group_name;
+}
void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
const string& flowctrl_info) {
@@ -129,11 +131,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
}
// check partition_key status
pthread_mutex_lock(&part_mutex_);
- if (partition_useds_.find(partition_key) == partition_useds_.end()
- && partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
- index_partitions_.remove(partition_key);
- index_partitions_.push_back(partition_key);
- }
+ resetIdlePartition(partition_key, true);
pthread_mutex_unlock(&part_mutex_);
pthread_rwlock_unlock(&meta_rw_lock_);
}
@@ -347,9 +345,7 @@ bool RmtDataCacheCsm::RemovePartition(string &err_info,
const string& confirm_context) {
int64_t booked_time;
string partition_key;
- map<string, PartitionExt>::iterator it_part;
- map<string, set<string> >::iterator it_topic;
- map<NodeInfo, set<string> >::iterator it_broker;
+ set<string> partition_keys;
// parse confirm context
bool result = parseConfirmContext(err_info,
confirm_context, partition_key, booked_time);
@@ -357,7 +353,6 @@ bool RmtDataCacheCsm::RemovePartition(string &err_info,
return false;
}
// remove partiton
- set<string> partition_keys;
partition_keys.insert(partition_key);
RemovePartition(partition_keys);
err_info = "Ok";
@@ -381,11 +376,7 @@ void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
pthread_rwlock_wrlock(&meta_rw_lock_);
for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); it_lst++) {
pthread_mutex_lock(&part_mutex_);
- partition_useds_.erase(*it_lst);
- index_partitions_.remove(*it_lst);
- // todo need modify if timer build finished
- partition_timeouts_.erase(*it_lst);
- // end todo
+ resetIdlePartition(*it_lst, false);
pthread_mutex_unlock(&part_mutex_);
// remove meta info set info
rmvMetaInfo(*it_lst);
@@ -427,13 +418,9 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
} else {
it_broker->second.push_back(it_part->second);
}
- rmvMetaInfo(part_key);
+ rmvMetaInfo(part_key);
}
- partition_useds_.erase(part_key);
- index_partitions_.remove(part_key);
- // todo need modify if timer build finished
- partition_timeouts_.erase(part_key);
- // end todo
+ resetIdlePartition(part_key, false);
}
pthread_mutex_unlock(&part_mutex_);
pthread_rwlock_unlock(&meta_rw_lock_);
@@ -494,6 +481,43 @@ bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
return result;
}
+void RmtDataCacheCsm::HandleTimeout(const string partition_key,
+ const asio::error_code& error) {
+ if (!error) {
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ pthread_mutex_lock(&part_mutex_);
+ resetIdlePartition(partition_key, true);
+ pthread_mutex_unlock(&part_mutex_);
+ pthread_rwlock_unlock(&meta_rw_lock_);
+ }
+}
+
+void RmtDataCacheCsm::addDelayTimer(const string& partition_key, int64_t delay_time) {
+ // add timer
+ tuple<int64_t, SteadyTimerPtr> timer =
+ std::make_tuple(Utils::GetCurrentTimeMillis(), executor_.Get()->CreateSteadyTimer());
+ std::get<1>(timer)->expires_after(std::chrono::milliseconds(delay_time));
+ std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, partition_key, _1));
+ partition_timeouts_.insert(std::make_pair(partition_key, timer));
+}
+
+void RmtDataCacheCsm::resetIdlePartition(const string& partition_key, bool need_reuse) {
+ map<string, PartitionExt>::iterator it_map;
+ map<string, tuple<int64_t, SteadyTimerPtr> >::iterator it_timeout;
+ partition_useds_.erase(partition_key);
+ it_timeout = partition_timeouts_.find(partition_key);
+ if (it_timeout != partition_timeouts_.end()) {
+ std::get<1>(it_timeout->second)->cancel();
+ partition_timeouts_.erase(partition_key);
+ }
+ index_partitions_.remove(partition_key);
+ if (need_reuse) {
+ if (partitions_.find(partition_key) != partitions_.end()) {
+ index_partitions_.push_back(partition_key);
+ }
+ }
+}
+
void RmtDataCacheCsm::buildConfirmContext(const string& partition_key,
int64_t booked_time, string& confirm_context) {
confirm_context.clear();
@@ -544,7 +568,7 @@ void RmtDataCacheCsm::rmvMetaInfo(const string& partition_key) {
bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
bool filter_consume, const string& confirm_context, bool is_consumed) {
- int64_t wait_time;
+ int64_t delay_time;
int64_t booked_time;
string partition_key;
map<string, PartitionExt>::iterator it_part;
@@ -577,14 +601,13 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
// wait release
partition_useds_.erase(partition_key);
index_partitions_.remove(partition_key);
- wait_time = 0;
+ delay_time = 0;
if (need_delay_check) {
- wait_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
+ delay_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
group_flowctrl_handler_, filter_consume, is_consumed);
}
- if (wait_time >= 10) {
- // todo add timer
- // end todo
+ if (delay_time > 10) {
+ addDelayTimer(partition_key, delay_time);
} else {
index_partitions_.push_back(partition_key);
}
@@ -603,5 +626,4 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
}
-
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
similarity index 96%
rename from tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
rename to tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
index dfd5b8c..b976add 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "tubemq/client_config.h"
+#include "tubemq/tubemq_config.h"
#include <sstream>
#include <vector>
@@ -84,7 +84,7 @@ bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrin
}
// parse and verify master address info
// master_addrinfo's format like ip1:port1,ip2:port2,ip3:port3
- map<string, int> tgt_address_map;
+ map<string, int32_t> tgt_address_map;
Utils::Split(master_addrinfo, tgt_address_map, delimiter::kDelimiterComma,
delimiter::kDelimiterColon);
if (tgt_address_map.empty()) {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
similarity index 96%
rename from tubemq-client-twins/tubemq-client-cpp/src/message.cc
rename to tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
index 93e3dd2..f914741 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "tubemq/message.h"
+#include "tubemq/tubemq_message.h"
#include <string.h>
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index 40d1de5..2fbaecd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -76,7 +76,7 @@ void Utils::Split(const string& source, vector<string>& result, const string& de
}
}
-void Utils::Split(const string& source, map<string, int>& result, const string& delimiter_step1,
+void Utils::Split(const string& source, map<string, int32_t>& result, const string& delimiter_step1,
const string& delimiter_step2) {
string item_str;
string key_str;