You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:46:44 UTC

[incubator-tubemq] 28/50: [TUBEMQ-269]Create C/C++ RmtDataCache class (#199)

This is an automated email from the ASF dual-hosted git repository.

gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 3be5ddc7c315b19776b98996a4096973417ee81a
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Jul 11 09:00:59 2020 +0000

    [TUBEMQ-269]Create C/C++ RmtDataCache class (#199)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../include/tubemq/rmt_data_cache.h                |  19 +-
 .../tubemq-client-cpp/src/meta_info.cc             |   4 +-
 .../tubemq-client-cpp/src/rmt_data_cache.cc        | 226 ++++++++++++++++++++-
 3 files changed, 240 insertions(+), 9 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index 11f9018..a4daf91 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -20,10 +20,9 @@
 #ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
 #define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
 
-#include <stdint.h>
 #include <pthread.h>
+#include <stdint.h>
 
-#include <atomic>
 #include <list>
 #include <map>
 #include <set>
@@ -47,12 +46,26 @@ class RmtDataCacheCsm {
   RmtDataCacheCsm();
   ~RmtDataCacheCsm();
   void AddNewPartition(const PartitionExt& partition_ext);
+  bool SelectPartition(string &err_info,
+           PartitionExt& partition_ext, string& confirm_context);
+  void BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+                            int32_t err_code, bool esc_limit, int32_t msg_size,
+                            int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow);
+  bool RelPartition(string &err_info, bool is_filterconsume,
+                         const string& confirm_context, bool is_consumed);
+  void RemovePartition(const list<PartitionExt>& partition_list);
+  bool RemovePartition(string &err_info, const string& confirm_context);
   void OfferEvent(const ConsumerEvent& event);
   void TakeEvent(ConsumerEvent& event);
   void ClearEvent();
   void OfferEventResult(const ConsumerEvent& event);
   bool PollEventResult(ConsumerEvent& event);
 
+ private:
+  void buildConfirmContext(const string& partition_key,
+                    int64_t booked_time, string& confirm_context);
+  bool parseConfirmContext(string &err_info,
+    const string& confirm_context, string& partition_key, int64_t& booked_time);
 
  private:
   // timer begin
@@ -68,6 +81,8 @@ class RmtDataCacheCsm {
   map<string, set<string> > topic_partition_;
   // broker parition map
   map<NodeInfo, set<string> > broker_partition_;
+  // for idle partitions occupy
+  pthread_mutex_t  part_mutex_;
   // for partiton idle map
   list<string> index_partitions_;
   // for partition used map
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 4f0f860..c01c260 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -443,7 +443,9 @@ const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partitionext_
 
 const string& SubscribeInfo::GetTopic() const { return this->partitionext_.GetTopic(); }
 
-const uint32_t SubscribeInfo::GetPartitionId() const { return this->partitionext_.GetPartitionId(); }
+const uint32_t SubscribeInfo::GetPartitionId() const {
+  return this->partitionext_.GetPartitionId();
+}
 
 const string& SubscribeInfo::ToString() const { return this->sub_info_; }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index d239d26..cb596ec 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -18,15 +18,25 @@
  */
 
 #include "tubemq/rmt_data_cache.h"
+
+#include <string>
+
+#include <stdlib.h>
+
+#include "tubemq/const_config.h"
 #include "tubemq/meta_info.h"
+#include "tubemq/utils.h"
 
 
 
 namespace tubemq {
- 
+
+
+
 
 RmtDataCacheCsm::RmtDataCacheCsm() {
   pthread_rwlock_init(&meta_rw_lock_, NULL);
+  pthread_mutex_init(&part_mutex_, NULL);
   pthread_mutex_init(&data_book_mutex_, NULL);
   pthread_mutex_init(&event_read_mutex_, NULL);
   pthread_cond_init(&event_read_cond_, NULL);
@@ -38,6 +48,7 @@ RmtDataCacheCsm::~RmtDataCacheCsm() {
   pthread_mutex_destroy(&event_read_mutex_);
   pthread_mutex_destroy(&data_book_mutex_);
   pthread_cond_destroy(&event_read_cond_);
+  pthread_mutex_destroy(&part_mutex_);
   pthread_rwlock_destroy(&meta_rw_lock_);
 }
 
@@ -74,7 +85,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
     }
   }
   // check partition_key status
-  if (partition_useds_.find(partition_key) == partition_useds_.end() 
+  if (partition_useds_.find(partition_key) == partition_useds_.end()
     && partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
     index_partitions_.remove(partition_key);
     index_partitions_.push_back(partition_key);
@@ -82,6 +93,192 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
   pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
+bool RmtDataCacheCsm::SelectPartition(string &err_info,
+                        PartitionExt& partition_ext, string& confirm_context) {
+  bool result = false;
+  int64_t booked_time = 0;
+  string partition_key;
+  map<string, PartitionExt>::iterator it_map;
+
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  if (partitions_.empty()) {
+    err_info = "No partition info in local cache, please retry later!";
+    result = false;
+  } else {
+    pthread_mutex_lock(&part_mutex_);
+    if (index_partitions_.empty()) {
+      err_info = "No idle partition to consume, please retry later!";
+      result = false;
+    } else {
+      result = false;
+      err_info = "No idle partition to consume data 2, please retry later!";
+      booked_time =Utils::GetCurrentTimeMillis();
+      partition_key = index_partitions_.front();
+      index_partitions_.pop_front();
+      buildConfirmContext(partition_key, booked_time, confirm_context);
+      it_map = partitions_.find(partition_key);
+      if (it_map != partitions_.end()) {
+        partition_ext = it_map->second;
+        partition_useds_[partition_key] = booked_time;
+        result = true;
+        err_info = "Ok";
+      }
+    }
+    pthread_mutex_unlock(&part_mutex_);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+  return result;
+}
+
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+                                             int32_t err_code, bool esc_limit, int32_t msg_size, 
+                                             int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow) {
+  map<string, PartitionExt>::iterator it_part;
+  // book partition offset info
+  if (curr_offset >= 0) {
+    pthread_mutex_lock(&data_book_mutex_);
+    partition_offset_[partition_key] = curr_offset;
+    pthread_mutex_unlock(&data_book_mutex_);
+  }
+  // book partition temp info
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  it_part = partitions_.find(partition_key);
+  if(it_part != partitions_.end()) {
+    it_part->second.BookConsumeData(err_code, msg_size,
+              esc_limit, limit_dlt, cur_data_dlt, require_slow);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+bool RmtDataCacheCsm::RelPartition(string &err_info, bool is_filterconsume,
+                                 const string& confirm_context, bool is_consumed) {
+  int64_t wait_time;
+  int64_t booked_time;
+  string  partition_key;
+  map<string, PartitionExt>::iterator it_Part;
+  map<string, int64_t>::iterator it_used;
+  // parse confirm context  
+  bool result = parseConfirmContext(err_info,
+                      confirm_context, partition_key, booked_time);
+  if (!result) {
+    return false;
+  }
+  pthread_rwlock_rdlock(&meta_rw_lock_);
+  it_Part = partitions_.find(partition_key);
+  if (it_Part == partitions_.end()) {
+    pthread_mutex_lock(&part_mutex_);
+    partition_useds_.erase(partition_key);
+    index_partitions_.remove(partition_key);
+    pthread_mutex_unlock(&part_mutex_);
+    err_info = "Not found the partition in Consume Partition set!";
+    result = false;
+  } else {
+    pthread_mutex_lock(&part_mutex_);
+    it_used = partition_useds_.find(partition_key);
+    if (it_used == partition_useds_.end()) {
+      index_partitions_.remove(partition_key);
+      index_partitions_.push_back(partition_key);       
+    } else {
+      if (it_used->second == booked_time) {
+        partition_useds_.erase(partition_key);
+        wait_time = it_Part->second.ProcConsumeResult(def_flowctrl_handler_, 
+                      group_flowctrl_handler_, is_filterconsume, is_consumed);
+        if (wait_time >= 10) {
+          // todo add timer 
+          // end todo
+        } else {
+          partition_useds_.erase(partition_key);
+          index_partitions_.remove(partition_key);
+        }
+        err_info = "Ok";
+        result = true;    
+      } else {
+        err_info = "Illegel confirmContext content: context not equal!";
+        result = false;
+      }
+    }
+    pthread_mutex_unlock(&part_mutex_);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+  return result;
+}
+
+bool RmtDataCacheCsm::RemovePartition(string &err_info,
+                                  const string& confirm_context) {
+  int64_t booked_time;
+  string  partition_key;
+  map<string, PartitionExt>::iterator it_Part;
+  map<string, set<string> >::iterator it_topic;
+  map<NodeInfo, set<string> >::iterator it_broker;
+  // parse confirm context  
+  bool result = parseConfirmContext(err_info,
+                      confirm_context, partition_key, booked_time);
+  if (!result) {
+    return false;
+  }
+  // remove partiton
+  pthread_rwlock_wrlock(&meta_rw_lock_);
+  partition_useds_.erase(partition_key);
+  index_partitions_.remove(partition_key);
+  // todo need modify if timer build finished
+  partition_timeouts_.erase(partition_key);
+  // end todo
+  it_Part = partitions_.find(partition_key);
+  if (it_Part != partitions_.end()) {
+    it_topic = topic_partition_.find(it_Part->second.GetTopic());
+    if (it_topic != topic_partition_.end()) {
+      it_topic->second.erase(it_Part->second.GetPartitionKey());
+      if (it_topic->second.empty()) {
+        topic_partition_.erase(it_Part->second.GetTopic());
+      }
+    }
+    it_broker = broker_partition_.find(it_Part->second.GetBrokerInfo());
+    if (it_broker != broker_partition_.end()) {
+      it_broker->second.erase(it_Part->second.GetPartitionKey());
+      if (it_broker->second.empty()) {
+        broker_partition_.erase(it_Part->second.GetBrokerInfo());
+      }
+    }
+    partitions_.erase(partition_key);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+  err_info = "Ok";
+  return true;
+}
+
+void RmtDataCacheCsm::RemovePartition(const list<PartitionExt>& partition_list) {
+  list<PartitionExt>::const_iterator it_lst;
+  map<string, PartitionExt>::iterator it_Part;
+  map<string, set<string> >::iterator it_topic;
+  map<NodeInfo, set<string> >::iterator it_broker;
+
+  pthread_rwlock_wrlock(&meta_rw_lock_);
+  for (it_lst = partition_list.begin(); it_lst != partition_list.end(); it_lst++) {
+    partition_useds_.erase(it_lst->GetPartitionKey());
+    index_partitions_.remove(it_lst->GetPartitionKey());
+    partitions_.erase(it_lst->GetPartitionKey());
+    // todo need modify if timer build finished
+    partition_timeouts_.erase(it_lst->GetPartitionKey());
+    // end todo
+		it_topic = topic_partition_.find(it_lst->GetTopic());
+		if (it_topic != topic_partition_.end()) {
+      it_topic->second.erase(it_lst->GetPartitionKey());
+      if (it_topic->second.empty()) {
+        topic_partition_.erase(it_lst->GetTopic());
+      }
+    }
+    it_broker = broker_partition_.find(it_lst->GetBrokerInfo());
+		if (it_broker != broker_partition_.end()) {
+      it_broker->second.erase(it_lst->GetPartitionKey());
+      if (it_broker->second.empty()) {
+        broker_partition_.erase(it_lst->GetBrokerInfo());
+      }
+    }
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+
 void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
   pthread_mutex_lock(&event_read_mutex_);
   this->rebalance_events_.push_back(event);
@@ -123,10 +320,27 @@ bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
   return result;
 }
 
+void RmtDataCacheCsm::buildConfirmContext(const string& partition_key,
+                                   int64_t booked_time, string& confirm_context) {
+  confirm_context.clear();
+  confirm_context += partition_key;
+  confirm_context += delimiter::kDelimiterAt;
+  confirm_context += Utils::Long2str(booked_time);
+}
 
-
-
-
-
+bool RmtDataCacheCsm::parseConfirmContext(string &err_info,
+     const string& confirm_context, string& partition_key, int64_t& booked_time) {
+  //
+  vector<string> result;
+  Utils::Split(confirm_context, result, delimiter::kDelimiterAt); 
+  if(result.empty()) {
+    err_info = "Illegel confirmContext content: unregular value format!";
+    return false;
+  }
+  partition_key = result[0];
+  booked_time = (int64_t)atol(result[1].c_str());
+  err_info = "Ok";
+  return true;
+}
 
 }  // namespace tubemq