You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/13 02:22:47 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-269]Create C/C++ RmtDataCache class (#202)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new 446c9f0 [TUBEMQ-269]Create C/C++ RmtDataCache class (#202)
446c9f0 is described below
commit 446c9f07a0754f8b83b75b23396ba9946a838ecd
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Jul 13 02:22:37 2020 +0000
[TUBEMQ-269]Create C/C++ RmtDataCache class (#202)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq-client-cpp/include/tubemq/meta_info.h | 3 +
.../include/tubemq/rmt_data_cache.h | 24 +-
.../tubemq-client-cpp/src/meta_info.cc | 36 +++
.../tubemq-client-cpp/src/rmt_data_cache.cc | 320 +++++++++++++++------
4 files changed, 297 insertions(+), 86 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index 813a9c5..2d2796e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -97,6 +97,7 @@ class PartitionExt : public Partition {
PartitionExt(const string& partition_info);
PartitionExt(const NodeInfo& broker_info, const string& part_str);
~PartitionExt();
+ PartitionExt& operator=(const PartitionExt& target);
void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow);
int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
@@ -135,6 +136,8 @@ class PartitionExt : public Partition {
class SubscribeInfo {
public:
SubscribeInfo(const string& sub_info);
+ SubscribeInfo(const string& consumer_id,
+ const string& group_name, const PartitionExt& partition_ext);
SubscribeInfo& operator=(const SubscribeInfo& target);
const string& GetConsumerId() const;
const string& GetGroup() const;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index a4daf91..7c25757 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -43,7 +43,7 @@ using std::list;
// consumer remote data cache
class RmtDataCacheCsm {
public:
- RmtDataCacheCsm();
+ RmtDataCacheCsm(const string& client_id, const string& group_name);
~RmtDataCacheCsm();
void AddNewPartition(const PartitionExt& partition_ext);
bool SelectPartition(string &err_info,
@@ -51,10 +51,24 @@ class RmtDataCacheCsm {
void BookedPartionInfo(const string& partition_key, int64_t curr_offset,
int32_t err_code, bool esc_limit, int32_t msg_size,
int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow);
- bool RelPartition(string &err_info, bool is_filterconsume,
+ bool RelPartition(string &err_info, bool filter_consume,
const string& confirm_context, bool is_consumed);
+ bool RelPartition(string &err_info, const string& confirm_context, bool is_consumed);
+ bool RelPartition(string &err_info, bool filter_consume,
+ const string& confirm_context, bool is_consumed,
+ int64_t curr_offset, int32_t err_code, bool esc_limit,
+ int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt);
+ void FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
+ list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions);
+ void GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst);
+ bool GetPartitionExt(const string& part_key, PartitionExt& partition_ext);
+ void GetRegBrokers(list<NodeInfo>& brokers);
+ void GetCurPartitionOffsets(map<string, int64_t> part_offset_map);
+ void GetAllBrokerPartitions(map<NodeInfo, list<PartitionExt> >& broker_parts);
void RemovePartition(const list<PartitionExt>& partition_list);
+ void RemovePartition(const set<string>& partition_keys);
bool RemovePartition(string &err_info, const string& confirm_context);
+ bool BookPartition(const string& partition_key);
void OfferEvent(const ConsumerEvent& event);
void TakeEvent(ConsumerEvent& event);
void ClearEvent();
@@ -62,15 +76,20 @@ class RmtDataCacheCsm {
bool PollEventResult(ConsumerEvent& event);
private:
+ void rmvMetaInfo(const string& partition_key);
void buildConfirmContext(const string& partition_key,
int64_t booked_time, string& confirm_context);
bool parseConfirmContext(string &err_info,
const string& confirm_context, string& partition_key, int64_t& booked_time);
+ bool inRelPartition(string &err_info, bool need_delay_check,
+ bool filter_consume, const string& confirm_context, bool is_consumed);
private:
// timer begin
// timer end
+ string consumer_id_;
+ string group_name_;
// flow ctrl
FlowCtrlRuleHandler group_flowctrl_handler_;
FlowCtrlRuleHandler def_flowctrl_handler_;
@@ -81,6 +100,7 @@ class RmtDataCacheCsm {
map<string, set<string> > topic_partition_;
// broker parition map
map<NodeInfo, set<string> > broker_partition_;
+ map<string, SubscribeInfo> part_subinfo_;
// for idle partitions occupy
pthread_mutex_t part_mutex_;
// for partiton idle map
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index c01c260..aea6239 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -261,6 +261,34 @@ PartitionExt::~PartitionExt() {
//
}
+PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
+ if (this != &target) {
+ // parent class
+ Partition::operator=(target);
+ // child class
+ this->is_last_consumed_ = target.is_last_consumed_;
+ this->cur_flowctrl_ = target.cur_flowctrl_;
+ this->cur_freqctrl_ = target.cur_freqctrl_;
+ this->next_stage_updtime_ = target.next_stage_updtime_;
+ this->next_slice_updtime_ = target.next_slice_updtime_;
+ this->limit_slice_msgsize_ = target.limit_slice_msgsize_;
+ this->cur_stage_msgsize_ = target.cur_stage_msgsize_;
+ this->cur_slice_msgsize_ = target.cur_slice_msgsize_;
+ this->total_zero_cnt_ = target.total_zero_cnt_;
+ this->booked_time_ = target.booked_time_;
+ this->booked_errcode_ = target.booked_errcode_;
+ this->booked_esc_limit_ = target.booked_esc_limit_;
+ this->booked_msgsize_ = target.booked_msgsize_;
+ this->booked_dlt_limit_ = target.booked_dlt_limit_;
+ this->booked_curdata_dlt_ = target.booked_curdata_dlt_;
+ this->booked_require_slow_ = target.booked_require_slow_;
+ this->booked_errcode_ = target.booked_errcode_;
+ this->booked_errcode_ = target.booked_errcode_;
+ }
+ return *this;
+}
+
+
void PartitionExt::BookConsumeData(int32_t errcode, int32_t msg_size,
bool req_esc_limit, int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow) {
this->booked_time_ = Utils::GetCurrentTimeMillis();
@@ -420,6 +448,14 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) {
buildSubInfo();
}
+SubscribeInfo::SubscribeInfo(const string& consumer_id,
+ const string& group_name, const PartitionExt& partition_ext) {
+ this->consumer_id_ = consumer_id;
+ this->group_ = group_name;
+ this->partitionext_ = partition_ext;
+ buildSubInfo();
+}
+
SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
if (this != &target) {
this->consumer_id_ = target.consumer_id_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index cb596ec..3b86b43 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -34,7 +34,10 @@ namespace tubemq {
-RmtDataCacheCsm::RmtDataCacheCsm() {
+RmtDataCacheCsm::RmtDataCacheCsm(const string& client_id,
+ const string& group_name) {
+ consumer_id_ = client_id;
+ group_name_ = group_name;
pthread_rwlock_init(&meta_rw_lock_, NULL);
pthread_mutex_init(&part_mutex_, NULL);
pthread_mutex_init(&data_book_mutex_, NULL);
@@ -58,6 +61,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
map<string, set<string> >::iterator it_topic;
map<NodeInfo, set<string> >::iterator it_broker;
//
+ SubscribeInfo sub_info(consumer_id_, group_name_, partition_ext);
string partition_key = partition_ext.GetPartitionKey();
pthread_rwlock_wrlock(&meta_rw_lock_);
it_map = partitions_.find(partition_key);
@@ -83,13 +87,16 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
it_broker->second.insert(partition_key);
}
}
+ part_subinfo_[partition_key] = sub_info;
}
// check partition_key status
+ pthread_mutex_lock(&part_mutex_);
if (partition_useds_.find(partition_key) == partition_useds_.end()
&& partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
index_partitions_.remove(partition_key);
index_partitions_.push_back(partition_key);
}
+ pthread_mutex_unlock(&part_mutex_);
pthread_rwlock_unlock(&meta_rw_lock_);
}
@@ -150,64 +157,137 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t cur
pthread_rwlock_unlock(&meta_rw_lock_);
}
-bool RmtDataCacheCsm::RelPartition(string &err_info, bool is_filterconsume,
+// success process release partition
+bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
const string& confirm_context, bool is_consumed) {
- int64_t wait_time;
+ return inRelPartition(err_info, true, filter_consume, confirm_context, is_consumed);
+}
+
+// release partiton without response return
+bool RmtDataCacheCsm::RelPartition(string &err_info,
+ const string& confirm_context, bool is_consumed) {
+ return inRelPartition(err_info, true, false, confirm_context, is_consumed);
+}
+
+// release partiton with error response return
+bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
+ const string& confirm_context, bool is_consumed,
+ int64_t curr_offset, int32_t err_code, bool esc_limit,
+ int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt) {
int64_t booked_time;
string partition_key;
- map<string, PartitionExt>::iterator it_Part;
- map<string, int64_t>::iterator it_used;
// parse confirm context
bool result = parseConfirmContext(err_info,
confirm_context, partition_key, booked_time);
if (!result) {
return false;
}
+ BookedPartionInfo(partition_key, curr_offset, err_code,
+ esc_limit, msg_size, limit_dlt, cur_data_dlt, false);
+ return inRelPartition(err_info, true,
+ filter_consume, confirm_context, is_consumed);
+}
+
+void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
+ list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions) {
+ //
+ map<string, PartitionExt>::iterator it_part;
+ list<SubscribeInfo>::const_iterator it_lst;
+ // initial return;
+ subscribed_partitions.clear();
+ unsub_partitions.clear();
pthread_rwlock_rdlock(&meta_rw_lock_);
- it_Part = partitions_.find(partition_key);
- if (it_Part == partitions_.end()) {
- pthread_mutex_lock(&part_mutex_);
- partition_useds_.erase(partition_key);
- index_partitions_.remove(partition_key);
- pthread_mutex_unlock(&part_mutex_);
- err_info = "Not found the partition in Consume Partition set!";
- result = false;
+ if (partitions_.empty()) {
+ for (it_lst = subscribe_info_lst.begin(); it_lst != subscribe_info_lst.end(); it_lst++) {
+ unsub_partitions.push_back(it_lst->GetPartitionExt());
+ }
} else {
- pthread_mutex_lock(&part_mutex_);
- it_used = partition_useds_.find(partition_key);
- if (it_used == partition_useds_.end()) {
- index_partitions_.remove(partition_key);
- index_partitions_.push_back(partition_key);
- } else {
- if (it_used->second == booked_time) {
- partition_useds_.erase(partition_key);
- wait_time = it_Part->second.ProcConsumeResult(def_flowctrl_handler_,
- group_flowctrl_handler_, is_filterconsume, is_consumed);
- if (wait_time >= 10) {
- // todo add timer
- // end todo
- } else {
- partition_useds_.erase(partition_key);
- index_partitions_.remove(partition_key);
- }
- err_info = "Ok";
- result = true;
- } else {
- err_info = "Illegel confirmContext content: context not equal!";
- result = false;
+ for (it_lst = subscribe_info_lst.begin(); it_lst != subscribe_info_lst.end(); it_lst++) {
+ it_part = partitions_.find(it_lst->GetPartitionExt().GetPartitionKey());
+ if (it_part == partitions_.end()) {
+ unsub_partitions.push_back(it_lst->GetPartitionExt());
+ } else {
+ subscribed_partitions.push_back(it_lst->GetPartitionExt());
}
}
- pthread_mutex_unlock(&part_mutex_);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst) {
+ map<string, SubscribeInfo>::iterator it_sub;
+ subscribe_info_lst.clear();
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ for (it_sub = part_subinfo_.begin(); it_sub != part_subinfo_.end(); ++it_sub) {
+ subscribe_info_lst.push_back(it_sub->second);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::GetAllBrokerPartitions(
+ map<NodeInfo, list<PartitionExt> >& broker_parts) {
+ map<string, PartitionExt>::iterator it_part;
+ map<NodeInfo, list<PartitionExt> >::iterator it_broker;
+
+ broker_parts.clear();
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ for (it_part = partitions_.begin(); it_part != partitions_.end(); ++it_part) {
+ it_broker = broker_parts.find(it_part->second.GetBrokerInfo());
+ if (it_broker == broker_parts.end()) {
+ list<PartitionExt> tmp_part_lst;
+ tmp_part_lst.push_back(it_part->second);
+ broker_parts[it_part->second.GetBrokerInfo()] = tmp_part_lst;
+ } else {
+ it_broker->second.push_back(it_part->second);
+ }
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+
+bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& partition_ext) {
+ bool result = false;
+ map<string, PartitionExt>::iterator it_map;
+
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ it_map = partitions_.find(part_key);
+ if (it_map != partitions_.end()) {
+ result = true;
+ partition_ext = it_map->second;
}
pthread_rwlock_unlock(&meta_rw_lock_);
return result;
}
+void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& brokers) {
+ map<NodeInfo, set<string> >::iterator it;
+
+ brokers.clear();
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ for (it = broker_partition_.begin(); it != broker_partition_.end(); ++it) {
+ brokers.push_back(it->first);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_map) {
+ map<string, int64_t>::iterator it;
+
+ part_offset_map.clear();
+ pthread_mutex_lock(&data_book_mutex_);
+ for (it = partition_offset_.begin(); it != partition_offset_.end(); ++it) {
+ part_offset_map[it->first] = it->second;
+ }
+ pthread_mutex_unlock(&data_book_mutex_);
+}
+
+
+//
bool RmtDataCacheCsm::RemovePartition(string &err_info,
const string& confirm_context) {
int64_t booked_time;
string partition_key;
- map<string, PartitionExt>::iterator it_Part;
+ map<string, PartitionExt>::iterator it_part;
map<string, set<string> >::iterator it_topic;
map<NodeInfo, set<string> >::iterator it_broker;
// parse confirm context
@@ -217,68 +297,53 @@ bool RmtDataCacheCsm::RemovePartition(string &err_info,
return false;
}
// remove partiton
- pthread_rwlock_wrlock(&meta_rw_lock_);
- partition_useds_.erase(partition_key);
- index_partitions_.remove(partition_key);
- // todo need modify if timer build finished
- partition_timeouts_.erase(partition_key);
- // end todo
- it_Part = partitions_.find(partition_key);
- if (it_Part != partitions_.end()) {
- it_topic = topic_partition_.find(it_Part->second.GetTopic());
- if (it_topic != topic_partition_.end()) {
- it_topic->second.erase(it_Part->second.GetPartitionKey());
- if (it_topic->second.empty()) {
- topic_partition_.erase(it_Part->second.GetTopic());
- }
- }
- it_broker = broker_partition_.find(it_Part->second.GetBrokerInfo());
- if (it_broker != broker_partition_.end()) {
- it_broker->second.erase(it_Part->second.GetPartitionKey());
- if (it_broker->second.empty()) {
- broker_partition_.erase(it_Part->second.GetBrokerInfo());
- }
- }
- partitions_.erase(partition_key);
- }
- pthread_rwlock_unlock(&meta_rw_lock_);
+ set<string> partition_keys;
+ partition_keys.insert(partition_key);
+ RemovePartition(partition_keys);
err_info = "Ok";
return true;
}
void RmtDataCacheCsm::RemovePartition(const list<PartitionExt>& partition_list) {
+ set<string> partition_keys;
list<PartitionExt>::const_iterator it_lst;
- map<string, PartitionExt>::iterator it_Part;
- map<string, set<string> >::iterator it_topic;
- map<NodeInfo, set<string> >::iterator it_broker;
+ for (it_lst = partition_list.begin(); it_lst != partition_list.end(); it_lst++) {
+ partition_keys.insert(it_lst->GetPartitionKey());
+ }
+ RemovePartition(partition_keys);
+}
+
+void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
+ set<string>::const_iterator it_lst;
pthread_rwlock_wrlock(&meta_rw_lock_);
- for (it_lst = partition_list.begin(); it_lst != partition_list.end(); it_lst++) {
- partition_useds_.erase(it_lst->GetPartitionKey());
- index_partitions_.remove(it_lst->GetPartitionKey());
- partitions_.erase(it_lst->GetPartitionKey());
+ for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); it_lst++) {
+ pthread_mutex_lock(&part_mutex_);
+ partition_useds_.erase(*it_lst);
+ index_partitions_.remove(*it_lst);
// todo need modify if timer build finished
- partition_timeouts_.erase(it_lst->GetPartitionKey());
+ partition_timeouts_.erase(*it_lst);
// end todo
- it_topic = topic_partition_.find(it_lst->GetTopic());
- if (it_topic != topic_partition_.end()) {
- it_topic->second.erase(it_lst->GetPartitionKey());
- if (it_topic->second.empty()) {
- topic_partition_.erase(it_lst->GetTopic());
- }
- }
- it_broker = broker_partition_.find(it_lst->GetBrokerInfo());
- if (it_broker != broker_partition_.end()) {
- it_broker->second.erase(it_lst->GetPartitionKey());
- if (it_broker->second.empty()) {
- broker_partition_.erase(it_lst->GetBrokerInfo());
- }
- }
+ pthread_mutex_unlock(&part_mutex_);
+ // remove meta info set info
+ rmvMetaInfo(*it_lst);
}
pthread_rwlock_unlock(&meta_rw_lock_);
}
+bool RmtDataCacheCsm::BookPartition(const string& partition_key) {
+ bool result = false;
+ map<string, bool>::iterator it;
+ pthread_mutex_lock(&data_book_mutex_);
+ it = part_reg_booked_.find(partition_key);
+ if (it == part_reg_booked_.end()) {
+ part_reg_booked_[partition_key] = true;
+ }
+ pthread_mutex_unlock(&data_book_mutex_);
+ return result;
+}
+
void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
pthread_mutex_lock(&event_read_mutex_);
this->rebalance_events_.push_back(event);
@@ -343,4 +408,91 @@ bool RmtDataCacheCsm::parseConfirmContext(string &err_info,
return true;
}
+void RmtDataCacheCsm::rmvMetaInfo(const string& partition_key) {
+ map<string, PartitionExt>::iterator it_part;
+ map<string, set<string> >::iterator it_topic;
+ map<NodeInfo, set<string> >::iterator it_broker;
+ it_part = partitions_.find(partition_key);
+ if (it_part != partitions_.end()) {
+ it_topic = topic_partition_.find(it_part->second.GetTopic());
+ if (it_topic != topic_partition_.end()) {
+ it_topic->second.erase(it_part->second.GetPartitionKey());
+ if (it_topic->second.empty()) {
+ topic_partition_.erase(it_part->second.GetTopic());
+ }
+ }
+ it_broker = broker_partition_.find(it_part->second.GetBrokerInfo());
+ if (it_broker != broker_partition_.end()) {
+ it_broker->second.erase(it_part->second.GetPartitionKey());
+ if (it_broker->second.empty()) {
+ broker_partition_.erase(it_part->second.GetBrokerInfo());
+ }
+ }
+ partitions_.erase(partition_key);
+ part_subinfo_.erase(partition_key);
+ }
+}
+
+bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
+ bool filter_consume, const string& confirm_context, bool is_consumed) {
+ int64_t wait_time;
+ int64_t booked_time;
+ string partition_key;
+ map<string, PartitionExt>::iterator it_part;
+ map<string, int64_t>::iterator it_used;
+ // parse confirm context
+ bool result = parseConfirmContext(err_info,
+ confirm_context, partition_key, booked_time);
+ if (!result) {
+ return false;
+ }
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ it_part = partitions_.find(partition_key);
+ if (it_part == partitions_.end()) {
+ // partition is unregister, release partition
+ pthread_mutex_lock(&part_mutex_);
+ partition_useds_.erase(partition_key);
+ index_partitions_.remove(partition_key);
+ pthread_mutex_unlock(&part_mutex_);
+ err_info = "Not found the partition in Consume Partition set!";
+ result = false;
+ } else {
+ pthread_mutex_lock(&part_mutex_);
+ it_used = partition_useds_.find(partition_key);
+ if (it_used == partition_useds_.end()) {
+ // partition is release but registered
+ index_partitions_.remove(partition_key);
+ index_partitions_.push_back(partition_key);
+ } else {
+ if (it_used->second == booked_time) {
+ // wait release
+ partition_useds_.erase(partition_key);
+ index_partitions_.remove(partition_key);
+ wait_time = 0;
+ if (need_delay_check) {
+ wait_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
+ group_flowctrl_handler_, filter_consume, is_consumed);
+ }
+ if (wait_time >= 10) {
+ // todo add timer
+ // end todo
+ } else {
+ index_partitions_.push_back(partition_key);
+ }
+ err_info = "Ok";
+ result = true;
+ } else {
+ // partiton is used by other thread
+ err_info = "Illegel confirmContext content: context not equal!";
+ result = false;
+ }
+ }
+ pthread_mutex_unlock(&part_mutex_);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+ return result;
+}
+
+
+
} // namespace tubemq