You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/21 01:45:09 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-286]Create C/C++ SDK's manager class (#213)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 09f9d2b  [TUBEMQ-286]Create C/C++ SDK's manager class (#213)
09f9d2b is described below

commit 09f9d2b4aeef36a2318851f77b5490f7480be58d
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Jul 21 01:44:58 2020 +0000

    [TUBEMQ-286]Create C/C++ SDK's manager class (#213)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../include/tubemq/client_service.h                | 13 ++---
 .../include/tubemq/client_subinfo.h                | 10 ++--
 .../include/tubemq/flowctrl_def.h                  |  2 +-
 .../include/tubemq/rmt_data_cache.h                |  6 +--
 .../include/tubemq/tubemq_config.h                 |  4 +-
 .../tubemq-client-cpp/include/tubemq/utils.h       |  1 +
 .../tubemq-client-cpp/src/client_service.cc        | 18 ++++---
 .../tubemq-client-cpp/src/client_subinfo.cc        | 27 +++++------
 .../tubemq-client-cpp/src/meta_info.cc             |  1 -
 .../tubemq-client-cpp/src/rmt_data_cache.cc        |  8 ++--
 tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 56 +++++++++++++++++++++-
 11 files changed, 103 insertions(+), 43 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
index 5e0b113..ef4ae78 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
@@ -68,24 +68,25 @@ class TubeMQService : public Singleton<TubeMQService> {
   bool IsRunning();
   const int32_t  GetServiceStatus() const { return service_status_.Get(); }
   int32_t GetClientObjCnt();
-  bool AddClientObj(string& err_info,
-         BaseClient* client_obj, int32_t& client_index);
+  bool AddClientObj(string& err_info, BaseClient* client_obj);
   BaseClient* GetClientObj(int32_t client_index) const;
   BaseClient* RmvClientObj(int32_t client_index);
-  const ExecutorPoolPtr& GetTimerExecutor() const { return timer_executor_; }
-  const ExecutorPoolPtr& GetNetWorkExecutor() const { return network_executor_; }
+  const string& GetLocalHost() const { return local_host_; }
+  const ExecutorPool& GetTimerExecutorPool() const { return timer_executor_; }
+  const ExecutorPool& GetNetWorkExecutorPool() const { return network_executor_; }
 
  private:
   void iniLogger(const Fileini& fileini, const string& sector);
   void shutDownClinets() const;
 
  private:
+  string local_host_;  
   AtomicInteger service_status_;
   AtomicInteger client_index_base_;
   mutable mutex mutex_;
   map<int32_t, BaseClient*> clients_map_;
-  ExecutorPoolPtr timer_executor_;
-  ExecutorPoolPtr network_executor_;
+  ExecutorPool timer_executor_;
+  ExecutorPool network_executor_;
 };
 
 }  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
index 613f97c..14bcbbd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
@@ -26,6 +26,7 @@
 #include <set>
 #include <string>
 #include "tubemq/atomic_def.h"
+#include "tubemq/tubemq_config.h"
 
 namespace tubemq {
 
@@ -51,14 +52,11 @@ class MasterAddrInfo {
 class ClientSubInfo {
  public:
   ClientSubInfo();
-  void SetConsumeTarget(bool bound_consume,
-                               const map<string, set<string> >& topic_and_filter_map,
-                               const string& session_key, uint32_t source_count,
-                               bool select_big, const map<string, int64_t>& part_offset_map);
+  void SetConsumeTarget(const ConsumerConfig& config);
   bool CompAndSetNotAllocated(bool expect, bool update);
   void BookFstRegistered() { first_registered_.Set(true); }
-  bool IsBoundConsume() { return bound_consume_; }
-  bool IsNotAllocated() { return not_allocated_.Get(); }
+  bool IsBoundConsume() const { return bound_consume_; }
+  bool IsNotAllocated() const { return not_allocated_.Get(); }
   const int64_t GetSubscribedTime() const { return subscribed_time_; }
   const string& GetSessionKey() const { return session_key_; }
   const uint32_t GetSourceCnt() const { return source_count_; }
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
index 8b99cbd..66b5aba 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
@@ -101,7 +101,7 @@ class FlowCtrlRuleHandler {
   int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
   int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
   void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); }
-  int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
+  const int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
 
  private:
   void initialStatisData();
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index 98f192e..3b85462 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -63,6 +63,8 @@ class RmtDataCacheCsm {
   void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
                  int64_t flowctrl_id, const string& flowctrl_info);
   const int64_t GetGroupQryPriorityId() const;
+  const int64_t GetDefFlowCtrlId() const { return def_flowctrl_handler_.GetFlowCtrlId(); }
+  const int64_t GetGroupFlowCtrlId() const { return group_flowctrl_handler_.GetFlowCtrlId(); }
   bool IsUnderGroupCtrl();
   void AddNewPartition(const PartitionExt& partition_ext);
   bool SelectPartition(string &err_info,
@@ -91,7 +93,7 @@ class RmtDataCacheCsm {
   bool RemovePartition(string &err_info, const string& confirm_context);
   void RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
         bool is_processing_rollback, map<NodeInfo, list<PartitionExt> >& broker_parts);
-  bool BookPartition(const string& partition_key);
+  bool IsPartitionFirstReg(const string& partition_key);
   void OfferEvent(const ConsumerEvent& event);
   void TakeEvent(ConsumerEvent& event);
   void ClearEvent();
@@ -112,8 +114,6 @@ class RmtDataCacheCsm {
 
 
  private:
-  // timer executor
-  ExecutorPool executor_;
   // 
   string consumer_id_;
   string group_name_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
index 3660cbb..f2b9952 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
@@ -99,10 +99,10 @@ class ConsumerConfig : public BaseConfig {
                              const map<string, set<string> >& subscribed_topic_and_filter_map,
                              const string& session_key, uint32_t source_count, bool is_select_big,
                              const map<string, int64_t>& part_offset_map);
-  bool IsBoundConsume() { return is_bound_consume_; }
+  bool IsBoundConsume() const { return is_bound_consume_; }
   const string& GetSessionKey() const { return session_key_; }
   const uint32_t GetSourceCount() const { return source_count_; }
-  bool IsSelectBig() { return is_select_big_; }
+  bool IsSelectBig() const { return is_select_big_; }
   const map<string, int64_t>& GetPartOffsetInfo() const { return part_offset_map_; }
   const string& GetGroupName() const;
   const map<string, set<string> >& GetSubTopicAndFilterMap() const;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
index 3fa9d48..3bd3068 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
@@ -52,6 +52,7 @@ class Utils {
   static uint32_t IpToInt(const string& ipv4_addr);
   static int64_t GetCurrentTimeMillis();
   static bool ValidConfigFile(string& err_info, const string& conf_file);
+  static bool GetLocalIPV4Address(string& err_info, string& localhost);
 };
 
 }  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
index 1357123..8e001a7 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
@@ -67,19 +67,25 @@ bool TubeMQService::Start(string& err_info, string conf_file) {
   if (!result) {
     return result;
   }
+  result = Utils::GetLocalIPV4Address(err_info, local_host_);
+  if (!result) {
+    return result;
+  }
   if (!service_status_.CompareAndSet(0,1)) {
     err_info = "TubeMQ Service has startted or Stopped!";
     return false;
   }
   iniLogger(fileini, sector);
-  service_status_.set(2);
+  service_status_.Set(2);
+  err_info = "Ok!";
+  return true;
 }
 
 bool TubeMQService::Stop(string& err_info) {
   if (service_status_.CompareAndSet(2, -1)) {
     shutDownClinets();
-    timer_executor_->Close();
-    network_executor_->Close();
+    timer_executor_.Close();
+    network_executor_.Close();
   }
   err_info = "OK!";
   return true;
@@ -110,14 +116,14 @@ int32_t TubeMQService::GetClientObjCnt() {
 }
 
 
-bool TubeMQService::AddClientObj(string& err_info,
-           BaseClient* client_obj, int32_t& client_index) {
+bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
   if (service_status_.Get() != 0) {
     err_info = "Service not startted!";
     return false;
   }
-  client_index = client_index_base_.IncrementAndGet();
+  int32_t client_index = client_index_base_.IncrementAndGet();
   lock_guard<mutex> lck(mutex_);
+  client_obj->SetClientIndex(client_index);
   this->clients_map_[client_index] = client_obj;
   err_info = "Ok";
   return true;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
index e57950c..a6c0fc1 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -80,25 +80,23 @@ ClientSubInfo::ClientSubInfo() {
   bound_partions_ = "";
 }
 
-void ClientSubInfo::SetConsumeTarget(bool bound_consume,
-         const map<string, set<string> >& topic_and_filter_map,
-         const string& session_key, uint32_t source_count,
-         bool select_big, const map<string, int64_t>& part_offset_map) {
+
+void ClientSubInfo::SetConsumeTarget(const ConsumerConfig& config) {
   int32_t count = 0;
   string tmpstr = "";
   // book register time
   subscribed_time_ = Utils::GetCurrentTimeMillis();
   //
   first_registered_.Set(false);
-  bound_consume_ = bound_consume;
-  topic_and_filter_map_ = topic_and_filter_map;
+  bound_consume_ = config.IsBoundConsume();
+  topic_and_filter_map_ = config.GetSubTopicAndFilterMap();
   // build topic filter info
   topics_.clear();
   topic_conds_.clear();
   set<string>::iterator it_set;
   map<string, set<string> >::const_iterator it_topic;
-  for (it_topic = topic_and_filter_map.begin();
-      it_topic != topic_and_filter_map.end(); it_topic++) {
+  for (it_topic = topic_and_filter_map_.begin();
+      it_topic != topic_and_filter_map_.end(); it_topic++) {
     topics_.push_back(it_topic->first);
     if (it_topic->second.empty()) {
       topic_filter_map_[it_topic->first] = false;
@@ -121,15 +119,16 @@ void ClientSubInfo::SetConsumeTarget(bool bound_consume,
   }
 
   // build bound_partition info
-  if (bound_consume) {
-    session_key_ = session_key;
-    source_count_ = source_count;
-    select_big_ = select_big;
-    assigned_part_map_ = part_offset_map;
+  if (bound_consume_) {
+    session_key_ = config.GetSessionKey();
+    source_count_ = config.GetSourceCount();
+    select_big_ = config.IsSelectBig();
+    assigned_part_map_ = config.GetPartOffsetInfo();
     count = 0;
     bound_partions_ = "";
     map<string, int64_t>::const_iterator it;
-    for (it = part_offset_map.begin(); it != part_offset_map.end(); it++) {
+    for (it = assigned_part_map_.begin();
+      it != assigned_part_map_.end(); it++) {
       if (count++ > 0) {
         bound_partions_ += delimiter::kDelimiterComma;
       }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 9299f71..4f03dfc 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -426,7 +426,6 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha
 SubscribeInfo::SubscribeInfo() {
   this->consumer_id_ = " ";
   this->group_ = " ";
-  this->partitionext_;
   buildSubInfo();
 }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index c9c499d..f97535c 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -22,6 +22,7 @@
 #include <stdlib.h>
 #include <string>
 
+#include "tubemq/client_service.h"
 #include "tubemq/const_config.h"
 #include "tubemq/meta_info.h"
 #include "tubemq/utils.h"
@@ -399,7 +400,7 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
 
 
 
-bool RmtDataCacheCsm::BookPartition(const string& partition_key) {
+bool RmtDataCacheCsm::IsPartitionFirstReg(const string& partition_key) {
   bool result = false;
   map<string, bool>::iterator it;
 
@@ -458,9 +459,10 @@ void RmtDataCacheCsm::HandleTimeout(const string partition_key,
 void RmtDataCacheCsm::addDelayTimer(const string& partition_key, int64_t delay_time) {
   // add timer
   tuple<int64_t, SteadyTimerPtr> timer = 
-      std::make_tuple(Utils::GetCurrentTimeMillis(), executor_.Get()->CreateSteadyTimer());
+      std::make_tuple(Utils::GetCurrentTimeMillis(),
+      TubeMQService::Instance().GetTimerExecutorPool().Get()->CreateSteadyTimer());
   std::get<1>(timer)->expires_after(std::chrono::milliseconds(delay_time));
-  std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, partition_key, _1));
+  std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, this, partition_key, _1));
   partition_timeouts_.insert(std::make_pair(partition_key, timer));          
 }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index ba6ac3a..75fcba5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -19,9 +19,15 @@
 
 #include "tubemq/utils.h"
 
+#include <arpa/inet.h>
+#include <linux/if.h>
+#include <netinet/in.h>
 #include <regex.h>
 #include <stdlib.h>
 #include <stdio.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
 #include <sys/time.h>
 #include <unistd.h>
 
@@ -280,7 +286,7 @@ bool Utils::ValidConfigFile(string& err_info, const string& conf_file) {
     err_info = "Configure file is blank";
     return false;
   }  
-  fp = fopen(configFile.c_str(),"r");
+  fp = fopen(conf_file.c_str(),"r");
   if(fp == NULL) {
     err_info = "Open configure file Failed!";
     return false;
@@ -290,6 +296,54 @@ bool Utils::ValidConfigFile(string& err_info, const string& conf_file) {
   return true;
 }
 
+bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
+  int32_t sockfd;
+  int32_t ip_num = 0;
+  char  buf[1024] = {0};
+  struct ifreq *ifreq;
+  struct ifreq if_flag;
+  struct ifconf ifconf;
+
+  ifconf.ifc_len = sizeof(buf);
+  ifconf.ifc_buf = buf;
+  if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
+    err_info = "Open the local socket(AF_INET, SOCK_DGRAM) failure!";
+    return false;
+  }
+
+  ioctl(sockfd, SIOCGIFCONF, &ifconf);
+  ifreq  = (struct ifreq *)buf;
+  ip_num = ifconf.ifc_len / sizeof(struct ifreq);
+  for (int32_t i = 0; i < ip_num; i++, ifreq++) {
+    if (ifreq->ifr_flags != AF_INET) {
+      continue;
+    }
+    if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) {
+      continue;
+    }
+    memcpy(&if_flag.ifr_name[0],&ifreq->ifr_name[0],sizeof(ifreq->ifr_name));
+    if ((ioctl(sockfd, SIOCGIFFLAGS, (char *) &if_flag)) < 0) {
+      continue;
+    }
+    if ((if_flag.ifr_flags & IFF_LOOPBACK)
+      || !(if_flag.ifr_flags & IFF_UP)) {
+      continue;
+    }
+    
+    if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr), 
+      "127.0.0.1", 7)) {
+      continue;
+    }
+    localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
+    close(sockfd);
+    err_info = "Ok";
+    return true;
+  }
+  close(sockfd);
+  err_info = "Not found the localHost in local OS";
+  return false;
+}
+
 
 }  // namespace tubemq