You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/16 14:28:38 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-283]Adjust C/C++ some file names: add "tubemq_" prefix (#208)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 28f48e0  [TUBEMQ-283]Adjust C/C++ some file names: add "tubemq_" prefix (#208)
28f48e0 is described below

commit 28f48e0d0e3d08344f3ad4a3e92379a056150203
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jul 16 14:28:24 2020 +0000

    [TUBEMQ-283]Adjust C/C++ some file names: add "tubemq_" prefix (#208)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../include/tubemq/rmt_data_cache.h                | 20 ++++--
 .../tubemq/{client_config.h => tubemq_config.h}    |  1 +
 .../tubemq/{const_errcode.h => tubemq_errcode.h}   |  0
 .../include/tubemq/{message.h => tubemq_message.h} |  0
 .../include/tubemq/tubemq_return.h                 |  2 +-
 .../tubemq-client-cpp/include/tubemq/utils.h       |  2 +-
 .../tubemq-client-cpp/src/meta_info.cc             |  2 +-
 .../tubemq-client-cpp/src/rmt_data_cache.cc        | 84 ++++++++++++++--------
 .../src/{client_config.cc => tubemq_config.cc}     |  4 +-
 .../src/{message.cc => tubemq_message.cc}          |  2 +-
 tubemq-client-twins/tubemq-client-cpp/src/utils.cc |  2 +-
 11 files changed, 76 insertions(+), 43 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index aa8f250..af12ce4 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -27,10 +27,13 @@
 #include <map>
 #include <set>
 #include <string>
+#include <tuple>
 
 #include "tubemq/atomic_def.h"
 #include "tubemq/flowctrl_def.h"
 #include "tubemq/meta_info.h"
+#include "tubemq/executor_pool.h"
+#include "tubemq/tubemq_errcode.h"
 
 
 
@@ -41,13 +44,17 @@ namespace tubemq {
 using std::map;
 using std::set;
 using std::list;
+using std::string;
+using std::tuple;
+
 
 
 // consumer remote data cache
 class RmtDataCacheCsm {
  public:
-  RmtDataCacheCsm(const string& client_id, const string& group_name);
+  RmtDataCacheCsm();
   ~RmtDataCacheCsm();
+  void SetConsumerInfo(const string& client_id, const string& group_name);
   void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
                                      const string& flowctrl_info);
   void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
@@ -87,8 +94,11 @@ class RmtDataCacheCsm {
   void ClearEvent();
   void OfferEventResult(const ConsumerEvent& event);
   bool PollEventResult(ConsumerEvent& event);
+  void HandleTimeout(const string partition_key, const asio::error_code& error);
 
  private:
+  void addDelayTimer(const string& part_key, int64_t delay_time);
+  void resetIdlePartition(const string& partition_key, bool need_reuse);
   void rmvMetaInfo(const string& partition_key);
   void buildConfirmContext(const string& partition_key,
                     int64_t booked_time, string& confirm_context);
@@ -99,9 +109,9 @@ class RmtDataCacheCsm {
 
 
  private:
-  // timer begin
-
-  // timer end
+  // timer executor
+  ExecutorPtr executor_;
+  // 
   string consumer_id_;
   string group_name_;
   // flow ctrl
@@ -125,7 +135,7 @@ class RmtDataCacheCsm {
   // for partition used map
   map<string, int64_t> partition_useds_;
   // for partiton timer map
-  map<string, int64_t> partition_timeouts_;
+  map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
   // data book
   pthread_mutex_t data_book_mutex_;
   // for partition offset cache
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
similarity index 99%
rename from tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
rename to tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
index fecbeb2..df3f09c 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
@@ -33,6 +33,7 @@ using std::map;
 using std::set;
 using std::string;
 
+
 class BaseConfig {
  public:
   BaseConfig();
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_errcode.h
similarity index 100%
rename from tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h
rename to tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_errcode.h
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_message.h
similarity index 100%
rename from tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
rename to tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_message.h
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
index 96194b4..2f35b22 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
@@ -25,7 +25,7 @@
 #include <list>
 #include <string>
 
-#include "tubemq/message.h"
+#include "tubemq/tubemq_message.h"
 #include "tubemq/meta_info.h"
 
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
index b4e0aeb..a25832f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
@@ -39,7 +39,7 @@ class Utils {
   // split string to vector
   static void Split(const string& source, vector<string>& result, const string& delimiter);
   // split string to map<string, int>
-  static void Split(const string& source, map<string, int>& result, const string& delimiter_step1,
+  static void Split(const string& source, map<string, int32_t>& result, const string& delimiter_step1,
                     const string& delimiter_step2);
   static void Join(const vector<string>& vec, const string& delimiter, string& target);
   static bool ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match,
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index ae7d72e..55e8cdc 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -25,7 +25,7 @@
 #include <vector>
 
 #include "tubemq/const_config.h"
-#include "tubemq/const_errcode.h"
+#include "tubemq/tubemq_errcode.h"
 #include "tubemq/utils.h"
 
 namespace tubemq {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index ec4876b..8b1e76b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -33,10 +33,7 @@ namespace tubemq {
 
 
 
-RmtDataCacheCsm::RmtDataCacheCsm(const string& client_id,
-                                      const string& group_name) {
-  consumer_id_ = client_id;
-  group_name_ = group_name;
+RmtDataCacheCsm::RmtDataCacheCsm() {
   under_groupctrl_.Set(false);
   last_checktime_.Set(0);
   pthread_rwlock_init(&meta_rw_lock_, NULL);
@@ -56,6 +53,11 @@ RmtDataCacheCsm::~RmtDataCacheCsm() {
   pthread_rwlock_destroy(&meta_rw_lock_);
 }
 
+void RmtDataCacheCsm::SetConsumerInfo(const string& client_id,
+                                            const string& group_name) {
+  consumer_id_ = client_id;
+  group_name_ = group_name;
+}
 
 void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
                                                  const string& flowctrl_info) {
@@ -129,11 +131,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
   }
   // check partition_key status
   pthread_mutex_lock(&part_mutex_);
-  if (partition_useds_.find(partition_key) == partition_useds_.end()
-    && partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
-    index_partitions_.remove(partition_key);
-    index_partitions_.push_back(partition_key);
-  }
+  resetIdlePartition(partition_key, true);
   pthread_mutex_unlock(&part_mutex_);
   pthread_rwlock_unlock(&meta_rw_lock_);
 }
@@ -347,9 +345,7 @@ bool RmtDataCacheCsm::RemovePartition(string &err_info,
                                   const string& confirm_context) {
   int64_t booked_time;
   string  partition_key;
-  map<string, PartitionExt>::iterator it_part;
-  map<string, set<string> >::iterator it_topic;
-  map<NodeInfo, set<string> >::iterator it_broker;
+  set<string> partition_keys;
   // parse confirm context
   bool result = parseConfirmContext(err_info,
                       confirm_context, partition_key, booked_time);
@@ -357,7 +353,6 @@ bool RmtDataCacheCsm::RemovePartition(string &err_info,
     return false;
   }
   // remove partiton
-  set<string> partition_keys;
   partition_keys.insert(partition_key);
   RemovePartition(partition_keys);
   err_info = "Ok";
@@ -381,11 +376,7 @@ void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
   pthread_rwlock_wrlock(&meta_rw_lock_);
   for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); it_lst++) {
     pthread_mutex_lock(&part_mutex_);
-    partition_useds_.erase(*it_lst);
-    index_partitions_.remove(*it_lst);
-    // todo need modify if timer build finished
-    partition_timeouts_.erase(*it_lst);
-    // end todo
+    resetIdlePartition(*it_lst, false);
     pthread_mutex_unlock(&part_mutex_);
     // remove meta info set info
     rmvMetaInfo(*it_lst);
@@ -427,13 +418,9 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
       } else {
         it_broker->second.push_back(it_part->second);
       }
-      rmvMetaInfo(part_key);  
+      rmvMetaInfo(part_key);
     }
-    partition_useds_.erase(part_key);
-    index_partitions_.remove(part_key);
-    // todo need modify if timer build finished
-    partition_timeouts_.erase(part_key);
-    // end todo
+    resetIdlePartition(part_key, false);
   }
   pthread_mutex_unlock(&part_mutex_);
   pthread_rwlock_unlock(&meta_rw_lock_);
@@ -494,6 +481,43 @@ bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
   return result;
 }
 
+void RmtDataCacheCsm::HandleTimeout(const string partition_key,
+                                          const asio::error_code& error) {
+  if (!error) {
+    pthread_rwlock_rdlock(&meta_rw_lock_);
+    pthread_mutex_lock(&part_mutex_);
+    resetIdlePartition(partition_key, true);
+    pthread_mutex_unlock(&part_mutex_);    
+    pthread_rwlock_unlock(&meta_rw_lock_);
+  }
+}
+
+void RmtDataCacheCsm::addDelayTimer(const string& partition_key, int64_t delay_time) {
+  // add timer
+  tuple<int64_t, SteadyTimerPtr> timer = 
+      std::make_tuple(Utils::GetCurrentTimeMillis(), executor_.Get()->CreateSteadyTimer());
+  std::get<1>(timer)->expires_after(std::chrono::milliseconds(delay_time));
+  std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, partition_key, _1));
+  partition_timeouts_.insert(std::make_pair(partition_key, timer));          
+}
+
+void RmtDataCacheCsm::resetIdlePartition(const string& partition_key, bool need_reuse) {
+  map<string, PartitionExt>::iterator it_map;
+  map<string, tuple<int64_t, SteadyTimerPtr> >::iterator it_timeout; 
+  partition_useds_.erase(partition_key);
+  it_timeout = partition_timeouts_.find(partition_key);
+  if (it_timeout != partition_timeouts_.end()) {
+    std::get<1>(it_timeout->second)->cancel();
+    partition_timeouts_.erase(partition_key);
+  }
+  index_partitions_.remove(partition_key);
+  if (need_reuse) {
+    if (partitions_.find(partition_key) != partitions_.end()) {
+      index_partitions_.push_back(partition_key);
+    }
+  }
+}
+
 void RmtDataCacheCsm::buildConfirmContext(const string& partition_key,
                                    int64_t booked_time, string& confirm_context) {
   confirm_context.clear();
@@ -544,7 +568,7 @@ void RmtDataCacheCsm::rmvMetaInfo(const string& partition_key) {
 
 bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
                      bool filter_consume, const string& confirm_context, bool is_consumed) {
-  int64_t wait_time;
+  int64_t delay_time;
   int64_t booked_time;
   string  partition_key;
   map<string, PartitionExt>::iterator it_part;
@@ -577,14 +601,13 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
         // wait release
         partition_useds_.erase(partition_key);
         index_partitions_.remove(partition_key);
-        wait_time = 0;
+        delay_time = 0;
         if (need_delay_check) {
-          wait_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
+          delay_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
                         group_flowctrl_handler_, filter_consume, is_consumed);
         }
-        if (wait_time >= 10) {
-          // todo add timer 
-          // end todo
+        if (delay_time > 10) {
+          addDelayTimer(partition_key, delay_time);
         } else {
           index_partitions_.push_back(partition_key);
         }
@@ -603,5 +626,4 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
 }
 
 
-
 }  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
similarity index 96%
rename from tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
rename to tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
index dfd5b8c..b976add 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include "tubemq/client_config.h"
+#include "tubemq/tubemq_config.h"
 
 #include <sstream>
 #include <vector>
@@ -84,7 +84,7 @@ bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrin
   }
   // parse and verify master address info
   // master_addrinfo's format like ip1:port1,ip2:port2,ip3:port3
-  map<string, int> tgt_address_map;
+  map<string, int32_t> tgt_address_map;
   Utils::Split(master_addrinfo, tgt_address_map, delimiter::kDelimiterComma,
                delimiter::kDelimiterColon);
   if (tgt_address_map.empty()) {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
similarity index 96%
rename from tubemq-client-twins/tubemq-client-cpp/src/message.cc
rename to tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
index 93e3dd2..f914741 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include "tubemq/message.h"
+#include "tubemq/tubemq_message.h"
 
 #include <string.h>
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index 40d1de5..2fbaecd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -76,7 +76,7 @@ void Utils::Split(const string& source, vector<string>& result, const string& de
   }
 }
 
-void Utils::Split(const string& source, map<string, int>& result, const string& delimiter_step1,
+void Utils::Split(const string& source, map<string, int32_t>& result, const string& delimiter_step1,
                   const string& delimiter_step2) {
   string item_str;
   string key_str;