You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:46:44 UTC
[incubator-tubemq] 28/50: [TUBEMQ-269]Create C/C++ RmtDataCache
class (#199)
This is an automated email from the ASF dual-hosted git repository.
gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 3be5ddc7c315b19776b98996a4096973417ee81a
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Jul 11 09:00:59 2020 +0000
[TUBEMQ-269]Create C/C++ RmtDataCache class (#199)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/rmt_data_cache.h | 19 +-
.../tubemq-client-cpp/src/meta_info.cc | 4 +-
.../tubemq-client-cpp/src/rmt_data_cache.cc | 226 ++++++++++++++++++++-
3 files changed, 240 insertions(+), 9 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index 11f9018..a4daf91 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -20,10 +20,9 @@
#ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
#define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
-#include <stdint.h>
#include <pthread.h>
+#include <stdint.h>
-#include <atomic>
#include <list>
#include <map>
#include <set>
@@ -47,12 +46,26 @@ class RmtDataCacheCsm {
RmtDataCacheCsm();
~RmtDataCacheCsm();
void AddNewPartition(const PartitionExt& partition_ext);
+ bool SelectPartition(string &err_info,
+ PartitionExt& partition_ext, string& confirm_context);
+ void BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+ int32_t err_code, bool esc_limit, int32_t msg_size,
+ int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow);
+ bool RelPartition(string &err_info, bool is_filterconsume,
+ const string& confirm_context, bool is_consumed);
+ void RemovePartition(const list<PartitionExt>& partition_list);
+ bool RemovePartition(string &err_info, const string& confirm_context);
void OfferEvent(const ConsumerEvent& event);
void TakeEvent(ConsumerEvent& event);
void ClearEvent();
void OfferEventResult(const ConsumerEvent& event);
bool PollEventResult(ConsumerEvent& event);
+ private:
+ void buildConfirmContext(const string& partition_key,
+ int64_t booked_time, string& confirm_context);
+ bool parseConfirmContext(string &err_info,
+ const string& confirm_context, string& partition_key, int64_t& booked_time);
private:
// timer begin
@@ -68,6 +81,8 @@ class RmtDataCacheCsm {
map<string, set<string> > topic_partition_;
// broker parition map
map<NodeInfo, set<string> > broker_partition_;
+ // for idle partitions occupy
+ pthread_mutex_t part_mutex_;
// for partiton idle map
list<string> index_partitions_;
// for partition used map
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 4f0f860..c01c260 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -443,7 +443,9 @@ const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partitionext_
const string& SubscribeInfo::GetTopic() const { return this->partitionext_.GetTopic(); }
-const uint32_t SubscribeInfo::GetPartitionId() const { return this->partitionext_.GetPartitionId(); }
+const uint32_t SubscribeInfo::GetPartitionId() const {
+ return this->partitionext_.GetPartitionId();
+}
const string& SubscribeInfo::ToString() const { return this->sub_info_; }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index d239d26..cb596ec 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -18,15 +18,25 @@
*/
#include "tubemq/rmt_data_cache.h"
+
+#include <string>
+
+#include <stdlib.h>
+
+#include "tubemq/const_config.h"
#include "tubemq/meta_info.h"
+#include "tubemq/utils.h"
namespace tubemq {
-
+
+
+
RmtDataCacheCsm::RmtDataCacheCsm() {
pthread_rwlock_init(&meta_rw_lock_, NULL);
+ pthread_mutex_init(&part_mutex_, NULL);
pthread_mutex_init(&data_book_mutex_, NULL);
pthread_mutex_init(&event_read_mutex_, NULL);
pthread_cond_init(&event_read_cond_, NULL);
@@ -38,6 +48,7 @@ RmtDataCacheCsm::~RmtDataCacheCsm() {
pthread_mutex_destroy(&event_read_mutex_);
pthread_mutex_destroy(&data_book_mutex_);
pthread_cond_destroy(&event_read_cond_);
+ pthread_mutex_destroy(&part_mutex_);
pthread_rwlock_destroy(&meta_rw_lock_);
}
@@ -74,7 +85,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
}
}
// check partition_key status
- if (partition_useds_.find(partition_key) == partition_useds_.end()
+ if (partition_useds_.find(partition_key) == partition_useds_.end()
&& partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
index_partitions_.remove(partition_key);
index_partitions_.push_back(partition_key);
@@ -82,6 +93,192 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
pthread_rwlock_unlock(&meta_rw_lock_);
}
+bool RmtDataCacheCsm::SelectPartition(string &err_info,
+ PartitionExt& partition_ext, string& confirm_context) {
+ bool result = false;
+ int64_t booked_time = 0;
+ string partition_key;
+ map<string, PartitionExt>::iterator it_map;
+
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ if (partitions_.empty()) {
+ err_info = "No partition info in local cache, please retry later!";
+ result = false;
+ } else {
+ pthread_mutex_lock(&part_mutex_);
+ if (index_partitions_.empty()) {
+ err_info = "No idle partition to consume, please retry later!";
+ result = false;
+ } else {
+ result = false;
+ err_info = "No idle partition to consume data 2, please retry later!";
+ booked_time =Utils::GetCurrentTimeMillis();
+ partition_key = index_partitions_.front();
+ index_partitions_.pop_front();
+ buildConfirmContext(partition_key, booked_time, confirm_context);
+ it_map = partitions_.find(partition_key);
+ if (it_map != partitions_.end()) {
+ partition_ext = it_map->second;
+ partition_useds_[partition_key] = booked_time;
+ result = true;
+ err_info = "Ok";
+ }
+ }
+ pthread_mutex_unlock(&part_mutex_);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+ return result;
+}
+
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+ int32_t err_code, bool esc_limit, int32_t msg_size,
+ int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow) {
+ map<string, PartitionExt>::iterator it_part;
+ // book partition offset info
+ if (curr_offset >= 0) {
+ pthread_mutex_lock(&data_book_mutex_);
+ partition_offset_[partition_key] = curr_offset;
+ pthread_mutex_unlock(&data_book_mutex_);
+ }
+ // book partition temp info
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ it_part = partitions_.find(partition_key);
+ if(it_part != partitions_.end()) {
+ it_part->second.BookConsumeData(err_code, msg_size,
+ esc_limit, limit_dlt, cur_data_dlt, require_slow);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+bool RmtDataCacheCsm::RelPartition(string &err_info, bool is_filterconsume,
+ const string& confirm_context, bool is_consumed) {
+ int64_t wait_time;
+ int64_t booked_time;
+ string partition_key;
+ map<string, PartitionExt>::iterator it_Part;
+ map<string, int64_t>::iterator it_used;
+ // parse confirm context
+ bool result = parseConfirmContext(err_info,
+ confirm_context, partition_key, booked_time);
+ if (!result) {
+ return false;
+ }
+ pthread_rwlock_rdlock(&meta_rw_lock_);
+ it_Part = partitions_.find(partition_key);
+ if (it_Part == partitions_.end()) {
+ pthread_mutex_lock(&part_mutex_);
+ partition_useds_.erase(partition_key);
+ index_partitions_.remove(partition_key);
+ pthread_mutex_unlock(&part_mutex_);
+ err_info = "Not found the partition in Consume Partition set!";
+ result = false;
+ } else {
+ pthread_mutex_lock(&part_mutex_);
+ it_used = partition_useds_.find(partition_key);
+ if (it_used == partition_useds_.end()) {
+ index_partitions_.remove(partition_key);
+ index_partitions_.push_back(partition_key);
+ } else {
+ if (it_used->second == booked_time) {
+ partition_useds_.erase(partition_key);
+ wait_time = it_Part->second.ProcConsumeResult(def_flowctrl_handler_,
+ group_flowctrl_handler_, is_filterconsume, is_consumed);
+ if (wait_time >= 10) {
+ // todo add timer
+ // end todo
+ } else {
+ partition_useds_.erase(partition_key);
+ index_partitions_.remove(partition_key);
+ }
+ err_info = "Ok";
+ result = true;
+ } else {
+ err_info = "Illegel confirmContext content: context not equal!";
+ result = false;
+ }
+ }
+ pthread_mutex_unlock(&part_mutex_);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+ return result;
+}
+
+bool RmtDataCacheCsm::RemovePartition(string &err_info,
+ const string& confirm_context) {
+ int64_t booked_time;
+ string partition_key;
+ map<string, PartitionExt>::iterator it_Part;
+ map<string, set<string> >::iterator it_topic;
+ map<NodeInfo, set<string> >::iterator it_broker;
+ // parse confirm context
+ bool result = parseConfirmContext(err_info,
+ confirm_context, partition_key, booked_time);
+ if (!result) {
+ return false;
+ }
+ // remove partiton
+ pthread_rwlock_wrlock(&meta_rw_lock_);
+ partition_useds_.erase(partition_key);
+ index_partitions_.remove(partition_key);
+ // todo need modify if timer build finished
+ partition_timeouts_.erase(partition_key);
+ // end todo
+ it_Part = partitions_.find(partition_key);
+ if (it_Part != partitions_.end()) {
+ it_topic = topic_partition_.find(it_Part->second.GetTopic());
+ if (it_topic != topic_partition_.end()) {
+ it_topic->second.erase(it_Part->second.GetPartitionKey());
+ if (it_topic->second.empty()) {
+ topic_partition_.erase(it_Part->second.GetTopic());
+ }
+ }
+ it_broker = broker_partition_.find(it_Part->second.GetBrokerInfo());
+ if (it_broker != broker_partition_.end()) {
+ it_broker->second.erase(it_Part->second.GetPartitionKey());
+ if (it_broker->second.empty()) {
+ broker_partition_.erase(it_Part->second.GetBrokerInfo());
+ }
+ }
+ partitions_.erase(partition_key);
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+ err_info = "Ok";
+ return true;
+}
+
+void RmtDataCacheCsm::RemovePartition(const list<PartitionExt>& partition_list) {
+ list<PartitionExt>::const_iterator it_lst;
+ map<string, PartitionExt>::iterator it_Part;
+ map<string, set<string> >::iterator it_topic;
+ map<NodeInfo, set<string> >::iterator it_broker;
+
+ pthread_rwlock_wrlock(&meta_rw_lock_);
+ for (it_lst = partition_list.begin(); it_lst != partition_list.end(); it_lst++) {
+ partition_useds_.erase(it_lst->GetPartitionKey());
+ index_partitions_.remove(it_lst->GetPartitionKey());
+ partitions_.erase(it_lst->GetPartitionKey());
+ // todo need modify if timer build finished
+ partition_timeouts_.erase(it_lst->GetPartitionKey());
+ // end todo
+ it_topic = topic_partition_.find(it_lst->GetTopic());
+ if (it_topic != topic_partition_.end()) {
+ it_topic->second.erase(it_lst->GetPartitionKey());
+ if (it_topic->second.empty()) {
+ topic_partition_.erase(it_lst->GetTopic());
+ }
+ }
+ it_broker = broker_partition_.find(it_lst->GetBrokerInfo());
+ if (it_broker != broker_partition_.end()) {
+ it_broker->second.erase(it_lst->GetPartitionKey());
+ if (it_broker->second.empty()) {
+ broker_partition_.erase(it_lst->GetBrokerInfo());
+ }
+ }
+ }
+ pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+
void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
pthread_mutex_lock(&event_read_mutex_);
this->rebalance_events_.push_back(event);
@@ -123,10 +320,27 @@ bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
return result;
}
+void RmtDataCacheCsm::buildConfirmContext(const string& partition_key,
+ int64_t booked_time, string& confirm_context) {
+ confirm_context.clear();
+ confirm_context += partition_key;
+ confirm_context += delimiter::kDelimiterAt;
+ confirm_context += Utils::Long2str(booked_time);
+}
-
-
-
-
+bool RmtDataCacheCsm::parseConfirmContext(string &err_info,
+ const string& confirm_context, string& partition_key, int64_t& booked_time) {
+ //
+ vector<string> result;
+ Utils::Split(confirm_context, result, delimiter::kDelimiterAt);
+ if(result.empty()) {
+ err_info = "Illegel confirmContext content: unregular value format!";
+ return false;
+ }
+ partition_key = result[0];
+ booked_time = (int64_t)atol(result[1].c_str());
+ err_info = "Ok";
+ return true;
+}
} // namespace tubemq