You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/21 01:45:09 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-286]Create C/C++ SDK's manager class (#213)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new 09f9d2b [TUBEMQ-286]Create C/C++ SDK's manager class (#213)
09f9d2b is described below
commit 09f9d2b4aeef36a2318851f77b5490f7480be58d
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Jul 21 01:44:58 2020 +0000
[TUBEMQ-286]Create C/C++ SDK's manager class (#213)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/client_service.h | 13 ++---
.../include/tubemq/client_subinfo.h | 10 ++--
.../include/tubemq/flowctrl_def.h | 2 +-
.../include/tubemq/rmt_data_cache.h | 6 +--
.../include/tubemq/tubemq_config.h | 4 +-
.../tubemq-client-cpp/include/tubemq/utils.h | 1 +
.../tubemq-client-cpp/src/client_service.cc | 18 ++++---
.../tubemq-client-cpp/src/client_subinfo.cc | 27 +++++------
.../tubemq-client-cpp/src/meta_info.cc | 1 -
.../tubemq-client-cpp/src/rmt_data_cache.cc | 8 ++--
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 56 +++++++++++++++++++++-
11 files changed, 103 insertions(+), 43 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
index 5e0b113..ef4ae78 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
@@ -68,24 +68,25 @@ class TubeMQService : public Singleton<TubeMQService> {
bool IsRunning();
const int32_t GetServiceStatus() const { return service_status_.Get(); }
int32_t GetClientObjCnt();
- bool AddClientObj(string& err_info,
- BaseClient* client_obj, int32_t& client_index);
+ bool AddClientObj(string& err_info, BaseClient* client_obj);
BaseClient* GetClientObj(int32_t client_index) const;
BaseClient* RmvClientObj(int32_t client_index);
- const ExecutorPoolPtr& GetTimerExecutor() const { return timer_executor_; }
- const ExecutorPoolPtr& GetNetWorkExecutor() const { return network_executor_; }
+ const string& GetLocalHost() const { return local_host_; }
+ const ExecutorPool& GetTimerExecutorPool() const { return timer_executor_; }
+ const ExecutorPool& GetNetWorkExecutorPool() const { return network_executor_; }
private:
void iniLogger(const Fileini& fileini, const string& sector);
void shutDownClinets() const;
private:
+ string local_host_;
AtomicInteger service_status_;
AtomicInteger client_index_base_;
mutable mutex mutex_;
map<int32_t, BaseClient*> clients_map_;
- ExecutorPoolPtr timer_executor_;
- ExecutorPoolPtr network_executor_;
+ ExecutorPool timer_executor_;
+ ExecutorPool network_executor_;
};
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
index 613f97c..14bcbbd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
@@ -26,6 +26,7 @@
#include <set>
#include <string>
#include "tubemq/atomic_def.h"
+#include "tubemq/tubemq_config.h"
namespace tubemq {
@@ -51,14 +52,11 @@ class MasterAddrInfo {
class ClientSubInfo {
public:
ClientSubInfo();
- void SetConsumeTarget(bool bound_consume,
- const map<string, set<string> >& topic_and_filter_map,
- const string& session_key, uint32_t source_count,
- bool select_big, const map<string, int64_t>& part_offset_map);
+ void SetConsumeTarget(const ConsumerConfig& config);
bool CompAndSetNotAllocated(bool expect, bool update);
void BookFstRegistered() { first_registered_.Set(true); }
- bool IsBoundConsume() { return bound_consume_; }
- bool IsNotAllocated() { return not_allocated_.Get(); }
+ bool IsBoundConsume() const { return bound_consume_; }
+ bool IsNotAllocated() const { return not_allocated_.Get(); }
const int64_t GetSubscribedTime() const { return subscribed_time_; }
const string& GetSessionKey() const { return session_key_; }
const uint32_t GetSourceCnt() const { return source_count_; }
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
index 8b99cbd..66b5aba 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
@@ -101,7 +101,7 @@ class FlowCtrlRuleHandler {
int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); }
- int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
+ const int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
private:
void initialStatisData();
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index 98f192e..3b85462 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -63,6 +63,8 @@ class RmtDataCacheCsm {
void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
int64_t flowctrl_id, const string& flowctrl_info);
const int64_t GetGroupQryPriorityId() const;
+ const int64_t GetDefFlowCtrlId() const { return def_flowctrl_handler_.GetFlowCtrlId(); }
+ const int64_t GetGroupFlowCtrlId() const { return group_flowctrl_handler_.GetFlowCtrlId(); }
bool IsUnderGroupCtrl();
void AddNewPartition(const PartitionExt& partition_ext);
bool SelectPartition(string &err_info,
@@ -91,7 +93,7 @@ class RmtDataCacheCsm {
bool RemovePartition(string &err_info, const string& confirm_context);
void RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
bool is_processing_rollback, map<NodeInfo, list<PartitionExt> >& broker_parts);
- bool BookPartition(const string& partition_key);
+ bool IsPartitionFirstReg(const string& partition_key);
void OfferEvent(const ConsumerEvent& event);
void TakeEvent(ConsumerEvent& event);
void ClearEvent();
@@ -112,8 +114,6 @@ class RmtDataCacheCsm {
private:
- // timer executor
- ExecutorPool executor_;
//
string consumer_id_;
string group_name_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
index 3660cbb..f2b9952 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
@@ -99,10 +99,10 @@ class ConsumerConfig : public BaseConfig {
const map<string, set<string> >& subscribed_topic_and_filter_map,
const string& session_key, uint32_t source_count, bool is_select_big,
const map<string, int64_t>& part_offset_map);
- bool IsBoundConsume() { return is_bound_consume_; }
+ bool IsBoundConsume() const { return is_bound_consume_; }
const string& GetSessionKey() const { return session_key_; }
const uint32_t GetSourceCount() const { return source_count_; }
- bool IsSelectBig() { return is_select_big_; }
+ bool IsSelectBig() const { return is_select_big_; }
const map<string, int64_t>& GetPartOffsetInfo() const { return part_offset_map_; }
const string& GetGroupName() const;
const map<string, set<string> >& GetSubTopicAndFilterMap() const;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
index 3fa9d48..3bd3068 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/utils.h
@@ -52,6 +52,7 @@ class Utils {
static uint32_t IpToInt(const string& ipv4_addr);
static int64_t GetCurrentTimeMillis();
static bool ValidConfigFile(string& err_info, const string& conf_file);
+ static bool GetLocalIPV4Address(string& err_info, string& localhost);
};
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
index 1357123..8e001a7 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
@@ -67,19 +67,25 @@ bool TubeMQService::Start(string& err_info, string conf_file) {
if (!result) {
return result;
}
+ result = Utils::GetLocalIPV4Address(err_info, local_host_);
+ if (!result) {
+ return result;
+ }
if (!service_status_.CompareAndSet(0,1)) {
err_info = "TubeMQ Service has startted or Stopped!";
return false;
}
iniLogger(fileini, sector);
- service_status_.set(2);
+ service_status_.Set(2);
+ err_info = "Ok!";
+ return true;
}
bool TubeMQService::Stop(string& err_info) {
if (service_status_.CompareAndSet(2, -1)) {
shutDownClinets();
- timer_executor_->Close();
- network_executor_->Close();
+ timer_executor_.Close();
+ network_executor_.Close();
}
err_info = "OK!";
return true;
@@ -110,14 +116,14 @@ int32_t TubeMQService::GetClientObjCnt() {
}
-bool TubeMQService::AddClientObj(string& err_info,
- BaseClient* client_obj, int32_t& client_index) {
+bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
if (service_status_.Get() != 0) {
err_info = "Service not startted!";
return false;
}
- client_index = client_index_base_.IncrementAndGet();
+ int32_t client_index = client_index_base_.IncrementAndGet();
lock_guard<mutex> lck(mutex_);
+ client_obj->SetClientIndex(client_index);
this->clients_map_[client_index] = client_obj;
err_info = "Ok";
return true;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
index e57950c..a6c0fc1 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -80,25 +80,23 @@ ClientSubInfo::ClientSubInfo() {
bound_partions_ = "";
}
-void ClientSubInfo::SetConsumeTarget(bool bound_consume,
- const map<string, set<string> >& topic_and_filter_map,
- const string& session_key, uint32_t source_count,
- bool select_big, const map<string, int64_t>& part_offset_map) {
+
+void ClientSubInfo::SetConsumeTarget(const ConsumerConfig& config) {
int32_t count = 0;
string tmpstr = "";
// book register time
subscribed_time_ = Utils::GetCurrentTimeMillis();
//
first_registered_.Set(false);
- bound_consume_ = bound_consume;
- topic_and_filter_map_ = topic_and_filter_map;
+ bound_consume_ = config.IsBoundConsume();
+ topic_and_filter_map_ = config.GetSubTopicAndFilterMap();
// build topic filter info
topics_.clear();
topic_conds_.clear();
set<string>::iterator it_set;
map<string, set<string> >::const_iterator it_topic;
- for (it_topic = topic_and_filter_map.begin();
- it_topic != topic_and_filter_map.end(); it_topic++) {
+ for (it_topic = topic_and_filter_map_.begin();
+ it_topic != topic_and_filter_map_.end(); it_topic++) {
topics_.push_back(it_topic->first);
if (it_topic->second.empty()) {
topic_filter_map_[it_topic->first] = false;
@@ -121,15 +119,16 @@ void ClientSubInfo::SetConsumeTarget(bool bound_consume,
}
// build bound_partition info
- if (bound_consume) {
- session_key_ = session_key;
- source_count_ = source_count;
- select_big_ = select_big;
- assigned_part_map_ = part_offset_map;
+ if (bound_consume_) {
+ session_key_ = config.GetSessionKey();
+ source_count_ = config.GetSourceCount();
+ select_big_ = config.IsSelectBig();
+ assigned_part_map_ = config.GetPartOffsetInfo();
count = 0;
bound_partions_ = "";
map<string, int64_t>::const_iterator it;
- for (it = part_offset_map.begin(); it != part_offset_map.end(); it++) {
+ for (it = assigned_part_map_.begin();
+ it != assigned_part_map_.end(); it++) {
if (count++ > 0) {
bound_partions_ += delimiter::kDelimiterComma;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 9299f71..4f03dfc 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -426,7 +426,6 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha
SubscribeInfo::SubscribeInfo() {
this->consumer_id_ = " ";
this->group_ = " ";
- this->partitionext_;
buildSubInfo();
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index c9c499d..f97535c 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -22,6 +22,7 @@
#include <stdlib.h>
#include <string>
+#include "tubemq/client_service.h"
#include "tubemq/const_config.h"
#include "tubemq/meta_info.h"
#include "tubemq/utils.h"
@@ -399,7 +400,7 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
-bool RmtDataCacheCsm::BookPartition(const string& partition_key) {
+bool RmtDataCacheCsm::IsPartitionFirstReg(const string& partition_key) {
bool result = false;
map<string, bool>::iterator it;
@@ -458,9 +459,10 @@ void RmtDataCacheCsm::HandleTimeout(const string partition_key,
void RmtDataCacheCsm::addDelayTimer(const string& partition_key, int64_t delay_time) {
// add timer
tuple<int64_t, SteadyTimerPtr> timer =
- std::make_tuple(Utils::GetCurrentTimeMillis(), executor_.Get()->CreateSteadyTimer());
+ std::make_tuple(Utils::GetCurrentTimeMillis(),
+ TubeMQService::Instance().GetTimerExecutorPool().Get()->CreateSteadyTimer());
std::get<1>(timer)->expires_after(std::chrono::milliseconds(delay_time));
- std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, partition_key, _1));
+ std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, this, partition_key, _1));
partition_timeouts_.insert(std::make_pair(partition_key, timer));
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index ba6ac3a..75fcba5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -19,9 +19,15 @@
#include "tubemq/utils.h"
+#include <arpa/inet.h>
+#include <linux/if.h>
+#include <netinet/in.h>
#include <regex.h>
#include <stdlib.h>
#include <stdio.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
@@ -280,7 +286,7 @@ bool Utils::ValidConfigFile(string& err_info, const string& conf_file) {
err_info = "Configure file is blank";
return false;
}
- fp = fopen(configFile.c_str(),"r");
+ fp = fopen(conf_file.c_str(),"r");
if(fp == NULL) {
err_info = "Open configure file Failed!";
return false;
@@ -290,6 +296,54 @@ bool Utils::ValidConfigFile(string& err_info, const string& conf_file) {
return true;
}
+bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
+ int32_t sockfd;
+ int32_t ip_num = 0;
+ char buf[1024] = {0};
+ struct ifreq *ifreq;
+ struct ifreq if_flag;
+ struct ifconf ifconf;
+
+ ifconf.ifc_len = sizeof(buf);
+ ifconf.ifc_buf = buf;
+ if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
+ err_info = "Open the local socket(AF_INET, SOCK_DGRAM) failure!";
+ return false;
+ }
+
+ ioctl(sockfd, SIOCGIFCONF, &ifconf);
+ ifreq = (struct ifreq *)buf;
+ ip_num = ifconf.ifc_len / sizeof(struct ifreq);
+ for (int32_t i = 0; i < ip_num; i++, ifreq++) {
+ if (ifreq->ifr_flags != AF_INET) {
+ continue;
+ }
+ if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) {
+ continue;
+ }
+ memcpy(&if_flag.ifr_name[0],&ifreq->ifr_name[0],sizeof(ifreq->ifr_name));
+ if ((ioctl(sockfd, SIOCGIFFLAGS, (char *) &if_flag)) < 0) {
+ continue;
+ }
+ if ((if_flag.ifr_flags & IFF_LOOPBACK)
+ || !(if_flag.ifr_flags & IFF_UP)) {
+ continue;
+ }
+
+ if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
+ "127.0.0.1", 7)) {
+ continue;
+ }
+ localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
+ close(sockfd);
+ err_info = "Ok";
+ return true;
+ }
+ close(sockfd);
+ err_info = "Not found the localHost in local OS";
+ return false;
+}
+
} // namespace tubemq