You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/13 02:22:47 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-269]Create C/C++ RmtDataCache class (#202)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 446c9f0  [TUBEMQ-269]Create C/C++ RmtDataCache class (#202)
446c9f0 is described below

commit 446c9f07a0754f8b83b75b23396ba9946a838ecd
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Jul 13 02:22:37 2020 +0000

    [TUBEMQ-269]Create C/C++ RmtDataCache class (#202)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq-client-cpp/include/tubemq/meta_info.h   |   3 +
 .../include/tubemq/rmt_data_cache.h                |  24 +-
 .../tubemq-client-cpp/src/meta_info.cc             |  36 +++
 .../tubemq-client-cpp/src/rmt_data_cache.cc        | 320 +++++++++++++++------
 4 files changed, 297 insertions(+), 86 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index 813a9c5..2d2796e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -97,6 +97,7 @@ class PartitionExt : public Partition {
   PartitionExt(const string& partition_info);
   PartitionExt(const NodeInfo& broker_info, const string& part_str);
   ~PartitionExt();
+  PartitionExt& operator=(const PartitionExt& target);
   void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
     int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow);
   int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
@@ -135,6 +136,8 @@ class PartitionExt : public Partition {
 class SubscribeInfo {
  public:
   SubscribeInfo(const string& sub_info);
+  SubscribeInfo(const string& consumer_id,
+        const string& group_name, const PartitionExt& partition_ext);
   SubscribeInfo& operator=(const SubscribeInfo& target);
   const string& GetConsumerId() const;
   const string& GetGroup() const;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index a4daf91..7c25757 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -43,7 +43,7 @@ using std::list;
 // consumer remote data cache
 class RmtDataCacheCsm {
  public:
-  RmtDataCacheCsm();
+  RmtDataCacheCsm(const string& client_id, const string& group_name);
   ~RmtDataCacheCsm();
   void AddNewPartition(const PartitionExt& partition_ext);
   bool SelectPartition(string &err_info,
@@ -51,10 +51,24 @@ class RmtDataCacheCsm {
   void BookedPartionInfo(const string& partition_key, int64_t curr_offset,
                             int32_t err_code, bool esc_limit, int32_t msg_size,
                             int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow);
-  bool RelPartition(string &err_info, bool is_filterconsume,
+  bool RelPartition(string &err_info, bool filter_consume,
                          const string& confirm_context, bool is_consumed);
+  bool RelPartition(string &err_info, const string& confirm_context, bool is_consumed);
+  bool RelPartition(string &err_info, bool filter_consume, 
+                         const string& confirm_context, bool is_consumed,
+                         int64_t curr_offset, int32_t err_code, bool esc_limit,
+                         int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt);
+  void FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
+                    list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions);
+  void GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst);
+  bool GetPartitionExt(const string& part_key, PartitionExt& partition_ext);
+  void GetRegBrokers(list<NodeInfo>& brokers);
+  void GetCurPartitionOffsets(map<string, int64_t> part_offset_map);
+  void GetAllBrokerPartitions(map<NodeInfo, list<PartitionExt> >& broker_parts);
   void RemovePartition(const list<PartitionExt>& partition_list);
+  void RemovePartition(const set<string>& partition_keys);
   bool RemovePartition(string &err_info, const string& confirm_context);
+  bool BookPartition(const string& partition_key);
   void OfferEvent(const ConsumerEvent& event);
   void TakeEvent(ConsumerEvent& event);
   void ClearEvent();
@@ -62,15 +76,20 @@ class RmtDataCacheCsm {
   bool PollEventResult(ConsumerEvent& event);
 
  private:
+  void rmvMetaInfo(const string& partition_key);
   void buildConfirmContext(const string& partition_key,
                     int64_t booked_time, string& confirm_context);
   bool parseConfirmContext(string &err_info,
     const string& confirm_context, string& partition_key, int64_t& booked_time);
+  bool inRelPartition(string &err_info, bool need_delay_check,
+    bool filter_consume, const string& confirm_context, bool is_consumed);
 
  private:
   // timer begin
 
   // timer end
+  string consumer_id_;
+  string group_name_;  
   // flow ctrl
   FlowCtrlRuleHandler group_flowctrl_handler_;
   FlowCtrlRuleHandler def_flowctrl_handler_;
@@ -81,6 +100,7 @@ class RmtDataCacheCsm {
   map<string, set<string> > topic_partition_;
   // broker parition map
   map<NodeInfo, set<string> > broker_partition_;
+  map<string, SubscribeInfo>  part_subinfo_;
   // for idle partitions occupy
   pthread_mutex_t  part_mutex_;
   // for partiton idle map
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index c01c260..aea6239 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -261,6 +261,34 @@ PartitionExt::~PartitionExt() {
   //
 }
 
+PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
+  if (this != &target) {
+    // parent class
+    Partition::operator=(target);
+    // child class    
+    this->is_last_consumed_ = target.is_last_consumed_;
+    this->cur_flowctrl_ = target.cur_flowctrl_;
+    this->cur_freqctrl_ = target.cur_freqctrl_;
+    this->next_stage_updtime_ = target.next_stage_updtime_;
+    this->next_slice_updtime_ = target.next_slice_updtime_;
+    this->limit_slice_msgsize_ = target.limit_slice_msgsize_;
+    this->cur_stage_msgsize_ = target.cur_stage_msgsize_;
+    this->cur_slice_msgsize_ = target.cur_slice_msgsize_;
+    this->total_zero_cnt_ = target.total_zero_cnt_;
+    this->booked_time_ = target.booked_time_;
+    this->booked_errcode_ = target.booked_errcode_;
+    this->booked_esc_limit_ = target.booked_esc_limit_;
+    this->booked_msgsize_ = target.booked_msgsize_;
+    this->booked_dlt_limit_ = target.booked_dlt_limit_;
+    this->booked_curdata_dlt_ = target.booked_curdata_dlt_;
+    this->booked_require_slow_ = target.booked_require_slow_;
+    this->booked_errcode_ = target.booked_errcode_;
+    this->booked_errcode_ = target.booked_errcode_;
+  }
+  return *this;
+}
+
+
 void PartitionExt::BookConsumeData(int32_t errcode, int32_t msg_size,
   bool req_esc_limit, int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow) {
   this->booked_time_ = Utils::GetCurrentTimeMillis();
@@ -420,6 +448,14 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) {
   buildSubInfo();
 }
 
+SubscribeInfo::SubscribeInfo(const string& consumer_id,
+        const string& group_name, const PartitionExt& partition_ext) {
+  this->consumer_id_ = consumer_id;
+  this->group_ = group_name;
+  this->partitionext_ = partition_ext;
+  buildSubInfo();
+}
+
 SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
   if (this != &target) {
     this->consumer_id_ = target.consumer_id_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index cb596ec..3b86b43 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -34,7 +34,10 @@ namespace tubemq {
 
 
 
-RmtDataCacheCsm::RmtDataCacheCsm() {
+RmtDataCacheCsm::RmtDataCacheCsm(const string& client_id,
+                                      const string& group_name) {
+  consumer_id_ = client_id;
+  group_name_ = group_name;
   pthread_rwlock_init(&meta_rw_lock_, NULL);
   pthread_mutex_init(&part_mutex_, NULL);
   pthread_mutex_init(&data_book_mutex_, NULL);
@@ -58,6 +61,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
   map<string, set<string> >::iterator it_topic;
   map<NodeInfo, set<string> >::iterator it_broker;
   //
+  SubscribeInfo sub_info(consumer_id_, group_name_, partition_ext);
   string partition_key = partition_ext.GetPartitionKey();
   pthread_rwlock_wrlock(&meta_rw_lock_);
   it_map = partitions_.find(partition_key);
@@ -83,13 +87,16 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
         it_broker->second.insert(partition_key);
       }
     }
+    part_subinfo_[partition_key] = sub_info;
   }
   // check partition_key status
+  pthread_mutex_lock(&part_mutex_);
   if (partition_useds_.find(partition_key) == partition_useds_.end()
     && partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
     index_partitions_.remove(partition_key);
     index_partitions_.push_back(partition_key);
   }
+  pthread_mutex_unlock(&part_mutex_);
   pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
@@ -150,64 +157,137 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t cur
   pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
-bool RmtDataCacheCsm::RelPartition(string &err_info, bool is_filterconsume,
+// success process release partition
+bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
                                  const string& confirm_context, bool is_consumed) {
-  int64_t wait_time;
+  return inRelPartition(err_info, true, filter_consume, confirm_context, is_consumed);
+}
+
+// release partiton without response return
+bool RmtDataCacheCsm::RelPartition(string &err_info,
+                                 const string& confirm_context, bool is_consumed) {
+  return inRelPartition(err_info, true, false, confirm_context, is_consumed);
+}
+
+// release partiton with error response return
+bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
+                              const string& confirm_context, bool is_consumed,
+                              int64_t curr_offset, int32_t err_code, bool esc_limit, 
+                              int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt) {
   int64_t booked_time;
   string  partition_key;
-  map<string, PartitionExt>::iterator it_Part;
-  map<string, int64_t>::iterator it_used;
   // parse confirm context  
   bool result = parseConfirmContext(err_info,
                       confirm_context, partition_key, booked_time);
   if (!result) {
     return false;
   }
+  BookedPartionInfo(partition_key, curr_offset, err_code,
+            esc_limit, msg_size, limit_dlt, cur_data_dlt, false);
+  return inRelPartition(err_info, true, 
+    filter_consume, confirm_context, is_consumed);
+}
+
+void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
+                    list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions) {
+  //
+  map<string, PartitionExt>::iterator it_part;
+  list<SubscribeInfo>::const_iterator it_lst;
+  // initial return;
+  subscribed_partitions.clear();
+  unsub_partitions.clear();
   pthread_rwlock_rdlock(&meta_rw_lock_);
-  it_Part = partitions_.find(partition_key);
-  if (it_Part == partitions_.end()) {
-    pthread_mutex_lock(&part_mutex_);
-    partition_useds_.erase(partition_key);
-    index_partitions_.remove(partition_key);
-    pthread_mutex_unlock(&part_mutex_);
-    err_info = "Not found the partition in Consume Partition set!";
-    result = false;
+  if (partitions_.empty()) {
+    for (it_lst = subscribe_info_lst.begin(); it_lst != subscribe_info_lst.end(); it_lst++) {
+      unsub_partitions.push_back(it_lst->GetPartitionExt());
+    }
   } else {
-    pthread_mutex_lock(&part_mutex_);
-    it_used = partition_useds_.find(partition_key);
-    if (it_used == partition_useds_.end()) {
-      index_partitions_.remove(partition_key);
-      index_partitions_.push_back(partition_key);       
-    } else {
-      if (it_used->second == booked_time) {
-        partition_useds_.erase(partition_key);
-        wait_time = it_Part->second.ProcConsumeResult(def_flowctrl_handler_, 
-                      group_flowctrl_handler_, is_filterconsume, is_consumed);
-        if (wait_time >= 10) {
-          // todo add timer 
-          // end todo
-        } else {
-          partition_useds_.erase(partition_key);
-          index_partitions_.remove(partition_key);
-        }
-        err_info = "Ok";
-        result = true;    
-      } else {
-        err_info = "Illegel confirmContext content: context not equal!";
-        result = false;
+    for (it_lst = subscribe_info_lst.begin(); it_lst != subscribe_info_lst.end(); it_lst++) {
+      it_part = partitions_.find(it_lst->GetPartitionExt().GetPartitionKey());
+    	if (it_part == partitions_.end()) {
+        unsub_partitions.push_back(it_lst->GetPartitionExt());
+    	} else {
+    		subscribed_partitions.push_back(it_lst->GetPartitionExt());
       }
     }
-    pthread_mutex_unlock(&part_mutex_);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst) {
+  map<string, SubscribeInfo>::iterator it_sub;
+  subscribe_info_lst.clear();                                             
+	pthread_rwlock_rdlock(&meta_rw_lock_);
+	for (it_sub = part_subinfo_.begin(); it_sub != part_subinfo_.end(); ++it_sub) {
+    subscribe_info_lst.push_back(it_sub->second);
+	}
+	pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::GetAllBrokerPartitions(
+                    map<NodeInfo, list<PartitionExt> >& broker_parts) {
+  map<string, PartitionExt>::iterator it_part;
+  map<NodeInfo, list<PartitionExt> >::iterator it_broker;
+
+  broker_parts.clear();
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  for (it_part = partitions_.begin(); it_part != partitions_.end(); ++it_part) {
+    it_broker = broker_parts.find(it_part->second.GetBrokerInfo());
+    if (it_broker == broker_parts.end()) {
+      list<PartitionExt> tmp_part_lst;
+      tmp_part_lst.push_back(it_part->second);
+      broker_parts[it_part->second.GetBrokerInfo()] = tmp_part_lst;
+    } else {
+      it_broker->second.push_back(it_part->second);
+    }
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+
+bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& partition_ext) {
+  bool result = false;
+  map<string, PartitionExt>::iterator it_map;
+  
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  it_map = partitions_.find(part_key);
+  if (it_map != partitions_.end()) {
+    result = true;
+    partition_ext = it_map->second;  
   }
   pthread_rwlock_unlock(&meta_rw_lock_);
   return result;
 }
 
+void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& brokers) {
+  map<NodeInfo, set<string> >::iterator it;  
+
+  brokers.clear();
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  for (it = broker_partition_.begin(); it != broker_partition_.end(); ++it) {
+    brokers.push_back(it->first);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_map) {
+  map<string, int64_t>::iterator it;
+
+  part_offset_map.clear();
+  pthread_mutex_lock(&data_book_mutex_);
+  for (it = partition_offset_.begin(); it != partition_offset_.end(); ++it) {
+    part_offset_map[it->first] = it->second;
+  }
+  pthread_mutex_unlock(&data_book_mutex_);
+}
+
+
+//
 bool RmtDataCacheCsm::RemovePartition(string &err_info,
                                   const string& confirm_context) {
   int64_t booked_time;
   string  partition_key;
-  map<string, PartitionExt>::iterator it_Part;
+  map<string, PartitionExt>::iterator it_part;
   map<string, set<string> >::iterator it_topic;
   map<NodeInfo, set<string> >::iterator it_broker;
   // parse confirm context  
@@ -217,68 +297,53 @@ bool RmtDataCacheCsm::RemovePartition(string &err_info,
     return false;
   }
   // remove partiton
-  pthread_rwlock_wrlock(&meta_rw_lock_);
-  partition_useds_.erase(partition_key);
-  index_partitions_.remove(partition_key);
-  // todo need modify if timer build finished
-  partition_timeouts_.erase(partition_key);
-  // end todo
-  it_Part = partitions_.find(partition_key);
-  if (it_Part != partitions_.end()) {
-    it_topic = topic_partition_.find(it_Part->second.GetTopic());
-    if (it_topic != topic_partition_.end()) {
-      it_topic->second.erase(it_Part->second.GetPartitionKey());
-      if (it_topic->second.empty()) {
-        topic_partition_.erase(it_Part->second.GetTopic());
-      }
-    }
-    it_broker = broker_partition_.find(it_Part->second.GetBrokerInfo());
-    if (it_broker != broker_partition_.end()) {
-      it_broker->second.erase(it_Part->second.GetPartitionKey());
-      if (it_broker->second.empty()) {
-        broker_partition_.erase(it_Part->second.GetBrokerInfo());
-      }
-    }
-    partitions_.erase(partition_key);
-  }
-  pthread_rwlock_unlock(&meta_rw_lock_);
+  set<string> partition_keys;
+  partition_keys.insert(partition_key);
+  RemovePartition(partition_keys);
   err_info = "Ok";
   return true;
 }
 
 void RmtDataCacheCsm::RemovePartition(const list<PartitionExt>& partition_list) {
+  set<string> partition_keys;
   list<PartitionExt>::const_iterator it_lst;
-  map<string, PartitionExt>::iterator it_Part;
-  map<string, set<string> >::iterator it_topic;
-  map<NodeInfo, set<string> >::iterator it_broker;
+  for (it_lst = partition_list.begin(); it_lst != partition_list.end(); it_lst++) {
+    partition_keys.insert(it_lst->GetPartitionKey());
+  }
+  RemovePartition(partition_keys);
+}
+
+void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
+  set<string>::const_iterator it_lst;
 
   pthread_rwlock_wrlock(&meta_rw_lock_);
-  for (it_lst = partition_list.begin(); it_lst != partition_list.end(); it_lst++) {
-    partition_useds_.erase(it_lst->GetPartitionKey());
-    index_partitions_.remove(it_lst->GetPartitionKey());
-    partitions_.erase(it_lst->GetPartitionKey());
+  for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); it_lst++) {
+    pthread_mutex_lock(&part_mutex_);
+    partition_useds_.erase(*it_lst);
+    index_partitions_.remove(*it_lst);
     // todo need modify if timer build finished
-    partition_timeouts_.erase(it_lst->GetPartitionKey());
+    partition_timeouts_.erase(*it_lst);
     // end todo
-		it_topic = topic_partition_.find(it_lst->GetTopic());
-		if (it_topic != topic_partition_.end()) {
-      it_topic->second.erase(it_lst->GetPartitionKey());
-      if (it_topic->second.empty()) {
-        topic_partition_.erase(it_lst->GetTopic());
-      }
-    }
-    it_broker = broker_partition_.find(it_lst->GetBrokerInfo());
-		if (it_broker != broker_partition_.end()) {
-      it_broker->second.erase(it_lst->GetPartitionKey());
-      if (it_broker->second.empty()) {
-        broker_partition_.erase(it_lst->GetBrokerInfo());
-      }
-    }
+    pthread_mutex_unlock(&part_mutex_);
+    // remove meta info set info
+    rmvMetaInfo(*it_lst);
   }
   pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 
+bool RmtDataCacheCsm::BookPartition(const string& partition_key) {
+  bool result = false;
+  map<string, bool>::iterator it;
+  pthread_mutex_lock(&data_book_mutex_);
+  it = part_reg_booked_.find(partition_key);
+  if (it == part_reg_booked_.end()) {
+    part_reg_booked_[partition_key] = true;
+  }
+  pthread_mutex_unlock(&data_book_mutex_);
+  return result;
+}
+
 void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
   pthread_mutex_lock(&event_read_mutex_);
   this->rebalance_events_.push_back(event);
@@ -343,4 +408,91 @@ bool RmtDataCacheCsm::parseConfirmContext(string &err_info,
   return true;
 }
 
+void RmtDataCacheCsm::rmvMetaInfo(const string& partition_key) {
+  map<string, PartitionExt>::iterator it_part;
+  map<string, set<string> >::iterator it_topic;
+  map<NodeInfo, set<string> >::iterator it_broker;  
+  it_part = partitions_.find(partition_key);
+  if (it_part != partitions_.end()) {
+    it_topic = topic_partition_.find(it_part->second.GetTopic());
+    if (it_topic != topic_partition_.end()) {
+      it_topic->second.erase(it_part->second.GetPartitionKey());
+      if (it_topic->second.empty()) {
+        topic_partition_.erase(it_part->second.GetTopic());
+      }
+    }
+    it_broker = broker_partition_.find(it_part->second.GetBrokerInfo());
+    if (it_broker != broker_partition_.end()) {
+      it_broker->second.erase(it_part->second.GetPartitionKey());
+      if (it_broker->second.empty()) {
+        broker_partition_.erase(it_part->second.GetBrokerInfo());
+      }
+    }
+    partitions_.erase(partition_key);
+    part_subinfo_.erase(partition_key);
+  }
+}
+
+bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
+                     bool filter_consume, const string& confirm_context, bool is_consumed) {
+  int64_t wait_time;
+  int64_t booked_time;
+  string  partition_key;
+  map<string, PartitionExt>::iterator it_part;
+  map<string, int64_t>::iterator it_used;
+  // parse confirm context  
+  bool result = parseConfirmContext(err_info,
+                      confirm_context, partition_key, booked_time);
+  if (!result) {
+    return false;
+  }
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  it_part = partitions_.find(partition_key);
+  if (it_part == partitions_.end()) {
+    // partition is unregister, release partition
+    pthread_mutex_lock(&part_mutex_);
+    partition_useds_.erase(partition_key);
+    index_partitions_.remove(partition_key);
+    pthread_mutex_unlock(&part_mutex_);
+    err_info = "Not found the partition in Consume Partition set!";
+    result = false;
+  } else {
+    pthread_mutex_lock(&part_mutex_);
+    it_used = partition_useds_.find(partition_key);
+    if (it_used == partition_useds_.end()) {
+      // partition is release but registered
+      index_partitions_.remove(partition_key);
+      index_partitions_.push_back(partition_key);
+    } else {
+      if (it_used->second == booked_time) {
+        // wait release
+        partition_useds_.erase(partition_key);
+        index_partitions_.remove(partition_key);
+        wait_time = 0;
+        if (need_delay_check) {
+          wait_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
+                        group_flowctrl_handler_, filter_consume, is_consumed);
+        } 
+        if (wait_time >= 10) {
+          // todo add timer 
+          // end todo
+        } else {
+          index_partitions_.push_back(partition_key);
+        }
+        err_info = "Ok";
+        result = true;    
+      } else {
+        // partiton is used by other thread
+        err_info = "Illegel confirmContext content: context not equal!";
+        result = false;
+      }
+    }
+    pthread_mutex_unlock(&part_mutex_);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+  return result;
+}
+
+
+
 }  // namespace tubemq