You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/17 11:46:21 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new a7ece60 [TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210)
a7ece60 is described below
commit a7ece606488fcc0cea0ac7ade03fe4d4cd9a87ef
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Jul 17 11:46:14 2020 +0000
[TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/flowctrl_def.h | 16 ++--
.../include/tubemq/rmt_data_cache.h | 19 ++--
.../include/tubemq/tubemq_config.h | 5 +
.../tubemq-client-cpp/src/flowctrl_def.cc | 71 ++++++++------
.../tubemq-client-cpp/src/meta_info.cc | 4 +-
.../tubemq-client-cpp/src/rmt_data_cache.cc | 102 ++++++---------------
6 files changed, 101 insertions(+), 116 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
index c0289ca..8b99cbd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
@@ -26,6 +26,7 @@
#include <algorithm>
#include <list>
#include <map>
+#include <mutex>
#include <string>
#include <vector>
@@ -34,6 +35,7 @@
namespace tubemq {
using std::map;
+using std::mutex;
using std::string;
using std::vector;
@@ -94,12 +96,12 @@ class FlowCtrlRuleHandler {
const string& flowctrl_info);
bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) const;
int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) const;
+ void GetFilterCtrlItem(FlowCtrlItem& result) const;
+ void GetFlowCtrlInfo(string& flowctrl_info) const;
int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); }
int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
- const FlowCtrlItem& GetFilterCtrlItem() const { return this->filter_ctrl_item_; }
- const string& GetFlowCtrlInfo() const { return this->flowctrl_info_; }
private:
void initialStatisData();
@@ -124,17 +126,17 @@ class FlowCtrlRuleHandler {
int32_t& value);
private:
+ mutable mutex config_lock_;
+ string flowctrl_info_;
+ FlowCtrlItem filter_ctrl_item_;
+ map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
+ int64_t last_update_time_;
AtomicLong flowctrl_id_;
AtomicInteger qrypriority_id_;
- string flowctrl_info_;
AtomicInteger min_zero_cnt_;
AtomicLong min_datadlt_limt_;
AtomicInteger datalimit_start_time_;
AtomicInteger datalimit_end_time_;
- FlowCtrlItem filter_ctrl_item_;
- map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
- pthread_rwlock_t configrw_lock_;
- int64_t last_update_time_;
};
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index af12ce4..98f192e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -20,11 +20,12 @@
#ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
#define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
-#include <pthread.h>
#include <stdint.h>
+#include <condition_variable>
#include <list>
#include <map>
+#include <mutex>
#include <set>
#include <string>
#include <tuple>
@@ -41,9 +42,11 @@
namespace tubemq {
+using std::condition_variable;
using std::map;
using std::set;
using std::list;
+using std::mutex;
using std::string;
using std::tuple;
@@ -110,7 +113,7 @@ class RmtDataCacheCsm {
private:
// timer executor
- ExecutorPtr executor_;
+ ExecutorPool executor_;
//
string consumer_id_;
string group_name_;
@@ -120,7 +123,7 @@ class RmtDataCacheCsm {
AtomicBoolean under_groupctrl_;
AtomicLong last_checktime_;
// meta info
- pthread_rwlock_t meta_rw_lock_;
+ mutable mutex meta_lock_;
// partiton allocated map
map<string, PartitionExt> partitions_;
// topic partiton map
@@ -129,25 +132,23 @@ class RmtDataCacheCsm {
map<NodeInfo, set<string> > broker_partition_;
map<string, SubscribeInfo> part_subinfo_;
// for idle partitions occupy
- pthread_mutex_t part_mutex_;
- // for partiton idle map
list<string> index_partitions_;
// for partition used map
map<string, int64_t> partition_useds_;
// for partiton timer map
map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
// data book
- pthread_mutex_t data_book_mutex_;
+ mutable mutex data_book_mutex_;
// for partition offset cache
map<string, int64_t> partition_offset_;
// for partiton register booked
map<string, bool> part_reg_booked_;
// event
- pthread_mutex_t event_read_mutex_;
- pthread_cond_t event_read_cond_;
+ mutable mutex event_read_mutex_;
+ condition_variable event_read_cond_;
list<ConsumerEvent> rebalance_events_;
- pthread_mutex_t event_write_mutex_;
+ mutable mutex event_write_mutex_;
list<ConsumerEvent> rebalance_results_;
};
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
index df3f09c..3660cbb 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
@@ -99,6 +99,11 @@ class ConsumerConfig : public BaseConfig {
const map<string, set<string> >& subscribed_topic_and_filter_map,
const string& session_key, uint32_t source_count, bool is_select_big,
const map<string, int64_t>& part_offset_map);
+ bool IsBoundConsume() { return is_bound_consume_; }
+ const string& GetSessionKey() const { return session_key_; }
+ const uint32_t GetSourceCount() const { return source_count_; }
+ bool IsSelectBig() { return is_select_big_; }
+ const map<string, int64_t>& GetPartOffsetInfo() const { return part_offset_map_; }
const string& GetGroupName() const;
const map<string, set<string> >& GetSubTopicAndFilterMap() const;
void SetConsumePosition(ConsumePosition consume_from_where);
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
index 370e6e6..ef3357f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
@@ -32,6 +32,7 @@
namespace tubemq {
using std::stringstream;
+using std::lock_guard;
FlowCtrlResult::FlowCtrlResult() {
this->datasize_limit_ = tb_config::kMaxIntValue;
@@ -171,10 +172,11 @@ FlowCtrlRuleHandler::FlowCtrlRuleHandler() {
this->datalimit_start_time_.Set(2500);
this->datalimit_end_time_.Set(tb_config::kInvalidValue);
this->last_update_time_ = Utils::GetCurrentTimeMillis();
- pthread_rwlock_init(&configrw_lock_, NULL);
}
-FlowCtrlRuleHandler::~FlowCtrlRuleHandler() { pthread_rwlock_destroy(&configrw_lock_); }
+FlowCtrlRuleHandler::~FlowCtrlRuleHandler() {
+ //
+}
void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id,
int64_t flowctrl_id, const string& flowctrl_info) {
@@ -186,7 +188,7 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qryprio
if (flowctrl_info.length() > 0) {
parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map);
}
- pthread_rwlock_wrlock(&this->configrw_lock_);
+ lock_guard<mutex> lck(config_lock_);
this->flowctrl_id_.Set(flowctrl_id);
this->qrypriority_id_.Set(qrypriority_id);
clearStatisData();
@@ -199,7 +201,6 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qryprio
initialStatisData();
}
this->last_update_time_ = Utils::GetCurrentTimeMillis();
- pthread_rwlock_unlock(&this->configrw_lock_);
if (is_default) {
LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
flowctrl_id);
@@ -268,51 +269,69 @@ void FlowCtrlRuleHandler::clearStatisData() {
bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
FlowCtrlResult& flowctrl_result) const {
struct tm utc_tm;
+ bool result = false;
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;
+ // get current data limit
time_t cur_time = time(NULL);
-
gmtime_r(&cur_time, &utc_tm);
int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min;
- if ((last_datadlt < this->min_datadlt_limt_.Get()) ||
- (curr_time < this->datalimit_start_time_.Get()) ||
- (curr_time > this->datalimit_end_time_.Get())) {
+ if ((last_datadlt < this->min_datadlt_limt_.Get())
+ || (curr_time < this->datalimit_start_time_.Get())
+ || (curr_time > this->datalimit_end_time_.Get())) {
return false;
}
+ // search total flowctrl rule
+ lock_guard<mutex> lck(config_lock_);
it_map = this->flowctrl_rules_.find(0);
- if (it_map == this->flowctrl_rules_.end()) {
- return false;
- }
- for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
- if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
- return true;
+ if (it_map != this->flowctrl_rules_.end()) {
+ for (it_vec = it_map->second.begin();it_vec != it_map->second.end(); ++it_vec) {
+ if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
+ result = true;
+ break;
+ }
}
}
- return false;
+ return result;
}
int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
int32_t received_limit) const {
- int32_t rule_val = -2;
+ int32_t limit_data = received_limit;
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;
-
+ // check min zero count
if (msg_zero_cnt < this->min_zero_cnt_.Get()) {
- return received_limit;
+ return limit_data;
}
+ // search rule allow value
+ lock_guard<mutex> lck(config_lock_);
it_map = this->flowctrl_rules_.find(1);
- if (it_map == this->flowctrl_rules_.end()) {
- return received_limit;
- }
- for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
- rule_val = it_vec->GetFreLimit(msg_zero_cnt);
- if (rule_val >= 0) {
- return rule_val;
+ if (it_map != this->flowctrl_rules_.end()) {
+ for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
+ limit_data = it_vec->GetFreLimit(msg_zero_cnt);
+ if (limit_data >= 0) {
+ break;
+ }
}
}
- return received_limit;
+ return limit_data;
+}
+
+void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const {
+ result.Clear();
+ lock_guard<mutex> lck(config_lock_);
+ result = this->filter_ctrl_item_;
+}
+
+void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const {
+ flowctrl_info.clear();
+ lock_guard<mutex> lck(config_lock_);
+ flowctrl_info = this->flowctrl_info_;
}
+
+
bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2) {
if (o1.GetStartTime() >= o2.GetStartTime()) {
return true;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 55e8cdc..9299f71 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -408,9 +408,9 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha
this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0);
}
}
- this->cur_freqctrl_ = group_flowctrl_handler.GetFilterCtrlItem();
+ group_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
if (this->cur_freqctrl_.GetFreqMsLimit() < 0) {
- this->cur_freqctrl_ = def_flowctrl_handler.GetFilterCtrlItem();
+ def_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
}
curr_time = Utils::GetCurrentTimeMillis();
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index 8b1e76b..c9c499d 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -30,27 +30,18 @@
namespace tubemq {
-
+using std::lock_guard;
+using std::unique_lock;
+using namespace std::placeholders;
RmtDataCacheCsm::RmtDataCacheCsm() {
under_groupctrl_.Set(false);
last_checktime_.Set(0);
- pthread_rwlock_init(&meta_rw_lock_, NULL);
- pthread_mutex_init(&part_mutex_, NULL);
- pthread_mutex_init(&data_book_mutex_, NULL);
- pthread_mutex_init(&event_read_mutex_, NULL);
- pthread_cond_init(&event_read_cond_, NULL);
- pthread_mutex_init(&event_write_mutex_, NULL);
}
RmtDataCacheCsm::~RmtDataCacheCsm() {
- pthread_mutex_destroy(&event_write_mutex_);
- pthread_mutex_destroy(&event_read_mutex_);
- pthread_mutex_destroy(&data_book_mutex_);
- pthread_cond_destroy(&event_read_cond_);
- pthread_mutex_destroy(&part_mutex_);
- pthread_rwlock_destroy(&meta_rw_lock_);
+ //
}
void RmtDataCacheCsm::SetConsumerInfo(const string& client_id,
@@ -103,7 +94,8 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
//
SubscribeInfo sub_info(consumer_id_, group_name_, partition_ext);
string partition_key = partition_ext.GetPartitionKey();
- pthread_rwlock_wrlock(&meta_rw_lock_);
+ // lock operate
+ lock_guard<mutex> lck(meta_lock_);
it_map = partitions_.find(partition_key);
if (it_map == partitions_.end()) {
partitions_[partition_key] = partition_ext;
@@ -130,10 +122,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
part_subinfo_[partition_key] = sub_info;
}
// check partition_key status
- pthread_mutex_lock(&part_mutex_);
resetIdlePartition(partition_key, true);
- pthread_mutex_unlock(&part_mutex_);
- pthread_rwlock_unlock(&meta_rw_lock_);
}
bool RmtDataCacheCsm::SelectPartition(string &err_info,
@@ -142,13 +131,12 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
int64_t booked_time = 0;
string partition_key;
map<string, PartitionExt>::iterator it_map;
-
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ // lock operate
+ lock_guard<mutex> lck(meta_lock_);
if (partitions_.empty()) {
err_info = "No partition info in local cache, please retry later!";
result = false;
} else {
- pthread_mutex_lock(&part_mutex_);
if (index_partitions_.empty()) {
err_info = "No idle partition to consume, please retry later!";
result = false;
@@ -167,9 +155,7 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
err_info = "Ok";
}
}
- pthread_mutex_unlock(&part_mutex_);
}
- pthread_rwlock_unlock(&meta_rw_lock_);
return result;
}
@@ -180,18 +166,16 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
map<string, PartitionExt>::iterator it_part;
// book partition offset info
if (curr_offset >= 0) {
- pthread_mutex_lock(&data_book_mutex_);
+ lock_guard<mutex> lck1(data_book_mutex_);
partition_offset_[partition_key] = curr_offset;
- pthread_mutex_unlock(&data_book_mutex_);
}
// book partition temp info
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck2(meta_lock_);
it_part = partitions_.find(partition_key);
if (it_part != partitions_.end()) {
it_part->second.BookConsumeData(err_code, msg_size,
esc_limit, limit_dlt, cur_data_dlt, require_slow);
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
// success process release partition
@@ -233,7 +217,7 @@ void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info
// initial return;
subscribed_partitions.clear();
unsub_partitions.clear();
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
if (partitions_.empty()) {
for (it_lst = subscribe_info_lst.begin(); it_lst != subscribe_info_lst.end(); it_lst++) {
unsub_partitions.push_back(it_lst->GetPartitionExt());
@@ -248,17 +232,15 @@ void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info
}
}
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst) {
map<string, SubscribeInfo>::iterator it_sub;
subscribe_info_lst.clear();
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
for (it_sub = part_subinfo_.begin(); it_sub != part_subinfo_.end(); ++it_sub) {
subscribe_info_lst.push_back(it_sub->second);
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
void RmtDataCacheCsm::GetAllBrokerPartitions(
@@ -267,7 +249,7 @@ void RmtDataCacheCsm::GetAllBrokerPartitions(
map<NodeInfo, list<PartitionExt> >::iterator it_broker;
broker_parts.clear();
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
for (it_part = partitions_.begin(); it_part != partitions_.end(); ++it_part) {
it_broker = broker_parts.find(it_part->second.GetBrokerInfo());
if (it_broker == broker_parts.end()) {
@@ -278,20 +260,18 @@ void RmtDataCacheCsm::GetAllBrokerPartitions(
it_broker->second.push_back(it_part->second);
}
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& partition_ext) {
bool result = false;
map<string, PartitionExt>::iterator it_map;
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
it_map = partitions_.find(part_key);
if (it_map != partitions_.end()) {
result = true;
partition_ext = it_map->second;
}
- pthread_rwlock_unlock(&meta_rw_lock_);
return result;
}
@@ -299,11 +279,10 @@ void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& brokers) {
map<NodeInfo, set<string> >::iterator it;
brokers.clear();
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
for (it = broker_partition_.begin(); it != broker_partition_.end(); ++it) {
brokers.push_back(it->first);
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
@@ -313,7 +292,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
map<string, PartitionExt>::iterator it_part;
partition_list.clear();
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
it_broker = broker_partition_.find(broker_info);
if (it_broker != broker_partition_.end()) {
for (it_key = it_broker->second.begin();
@@ -324,7 +303,6 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
}
}
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
@@ -332,11 +310,10 @@ void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_ma
map<string, int64_t>::iterator it;
part_offset_map.clear();
- pthread_mutex_lock(&data_book_mutex_);
+ lock_guard<mutex> lck(data_book_mutex_);
for (it = partition_offset_.begin(); it != partition_offset_.end(); ++it) {
part_offset_map[it->first] = it->second;
}
- pthread_mutex_unlock(&data_book_mutex_);
}
@@ -373,15 +350,12 @@ void RmtDataCacheCsm::RemovePartition(const list<PartitionExt>& partition_list)
void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
set<string>::const_iterator it_lst;
- pthread_rwlock_wrlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); it_lst++) {
- pthread_mutex_lock(&part_mutex_);
resetIdlePartition(*it_lst, false);
- pthread_mutex_unlock(&part_mutex_);
// remove meta info set info
rmvMetaInfo(*it_lst);
}
- pthread_rwlock_unlock(&meta_rw_lock_);
}
void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
@@ -397,8 +371,7 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
if (subscribe_infos.empty()) {
return;
}
- pthread_rwlock_wrlock(&meta_rw_lock_);
- pthread_mutex_lock(&part_mutex_);
+ lock_guard<mutex> lck(meta_lock_);
for (it = subscribe_infos.begin(); it != subscribe_infos.end(); ++it) {
part_key = it->GetPartitionExt().GetPartitionKey();
it_part = partitions_.find(part_key);
@@ -422,8 +395,6 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
}
resetIdlePartition(part_key, false);
}
- pthread_mutex_unlock(&part_mutex_);
- pthread_rwlock_unlock(&meta_rw_lock_);
}
@@ -431,64 +402,56 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
bool RmtDataCacheCsm::BookPartition(const string& partition_key) {
bool result = false;
map<string, bool>::iterator it;
- pthread_mutex_lock(&data_book_mutex_);
+
+ lock_guard<mutex> lck(data_book_mutex_);
it = part_reg_booked_.find(partition_key);
if (it == part_reg_booked_.end()) {
part_reg_booked_[partition_key] = true;
}
- pthread_mutex_unlock(&data_book_mutex_);
return result;
}
void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
- pthread_mutex_lock(&event_read_mutex_);
+ unique_lock<mutex> lck(event_read_mutex_);
this->rebalance_events_.push_back(event);
- pthread_cond_broadcast(&event_read_cond_);
- pthread_mutex_unlock(&event_read_mutex_);
+ event_read_cond_.notify_all();
}
void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) {
- pthread_mutex_lock(&event_read_mutex_);
+ unique_lock<mutex> lck(event_read_mutex_);
while (this->rebalance_events_.empty()) {
- pthread_cond_wait(&event_read_cond_, &event_read_mutex_);
+ event_read_cond_.wait(lck);
}
event = rebalance_events_.front();
rebalance_events_.pop_front();
- pthread_mutex_unlock(&event_read_mutex_);
}
void RmtDataCacheCsm::ClearEvent() {
- pthread_mutex_lock(&event_read_mutex_);
+ unique_lock<mutex> lck(event_read_mutex_);
rebalance_events_.clear();
- pthread_mutex_unlock(&event_read_mutex_);
}
void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) {
- pthread_mutex_lock(&event_write_mutex_);
+ lock_guard<mutex> lck(event_write_mutex_);
this->rebalance_events_.push_back(event);
- pthread_mutex_unlock(&event_write_mutex_);
}
bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
bool result = false;
- pthread_mutex_lock(&event_write_mutex_);
+ lock_guard<mutex> lck(event_write_mutex_);
if (!rebalance_events_.empty()) {
event = rebalance_events_.front();
rebalance_events_.pop_front();
result = true;
}
- pthread_mutex_unlock(&event_write_mutex_);
return result;
}
void RmtDataCacheCsm::HandleTimeout(const string partition_key,
const asio::error_code& error) {
if (!error) {
- pthread_rwlock_rdlock(&meta_rw_lock_);
- pthread_mutex_lock(&part_mutex_);
+ lock_guard<mutex> lck(meta_lock_);
resetIdlePartition(partition_key, true);
- pthread_mutex_unlock(&part_mutex_);
- pthread_rwlock_unlock(&meta_rw_lock_);
}
}
@@ -579,18 +542,15 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
if (!result) {
return false;
}
- pthread_rwlock_rdlock(&meta_rw_lock_);
+ lock_guard<mutex> lck(meta_lock_);
it_part = partitions_.find(partition_key);
if (it_part == partitions_.end()) {
// partition is unregister, release partition
- pthread_mutex_lock(&part_mutex_);
partition_useds_.erase(partition_key);
index_partitions_.remove(partition_key);
- pthread_mutex_unlock(&part_mutex_);
err_info = "Not found the partition in Consume Partition set!";
result = false;
} else {
- pthread_mutex_lock(&part_mutex_);
it_used = partition_useds_.find(partition_key);
if (it_used == partition_useds_.end()) {
// partition is release but registered
@@ -619,9 +579,7 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
result = false;
}
}
- pthread_mutex_unlock(&part_mutex_);
}
- pthread_rwlock_unlock(&meta_rw_lock_);
return result;
}