You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/09 03:29:30 UTC
[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-269]
Create C/C++ RmtDataCache class (#195)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new 5343eb4 [TUBEMQ-269] Create C/C++ RmtDataCache class (#195)
5343eb4 is described below
commit 5343eb44d5f09da30a30ae6baa0dc6a2be05a9d1
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jul 9 03:29:21 2020 +0000
[TUBEMQ-269] Create C/C++ RmtDataCache class (#195)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/client_config.h | 3 +-
.../include/tubemq/const_errcode.h | 49 +++++++
.../include/tubemq/flowctrl_def.h | 2 +-
.../tubemq-client-cpp/include/tubemq/message.h | 2 +-
.../tubemq-client-cpp/include/tubemq/meta_info.h | 33 +++++
.../tubemq-client-cpp/src/client_config.cc | 10 +-
.../tubemq-client-cpp/src/file_ini.cc | 4 +
.../tubemq-client-cpp/src/flowctrl_def.cc | 2 +
.../tubemq-client-cpp/src/message.cc | 5 +-
.../tubemq-client-cpp/src/meta_info.cc | 148 ++++++++++++++++++++-
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 3 +
11 files changed, 250 insertions(+), 11 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
index 95bef67..fecbeb2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
@@ -32,7 +32,6 @@ namespace tubemq {
using std::map;
using std::set;
using std::string;
-using std::vector;
class BaseConfig {
public:
@@ -122,7 +121,7 @@ class ConsumerConfig : public BaseConfig {
private:
bool setGroupConsumeTarget(string& err_info, bool is_bound_consume, const string& group_name,
const map<string, set<string> >& subscribed_topic_and_filter_map,
- const string& session_key, int32_t source_count, bool is_select_big,
+ const string& session_key, uint32_t source_count, bool is_select_big,
const map<string, int64_t>& part_offset_map);
private:
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h
new file mode 100644
index 0000000..c4e6370
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_CONST_ERR_CODE_H_
+#define TUBEMQ_CLIENT_CONST_ERR_CODE_H_
+
+namespace tubemq {
+
+namespace err_code {
+ static const int32_t kErrSuccess = 200;
+ static const int32_t kErrNotReady = 201;
+ static const int32_t kErrMoved = 301;
+
+ static const int32_t kErrBadRequest = 400;
+ static const int32_t kErrUnAuthorized = 401;
+ static const int32_t kErrForbidden = 403;
+ static const int32_t kErrNotFound = 404;
+ static const int32_t kErrPartitionOccupied = 410;
+ static const int32_t kErrHbNoNode = 411;
+ static const int32_t kErrDuplicatePartition = 412;
+ static const int32_t kErrCertificateFailure = 415;
+ static const int32_t kErrServerOverflow = 419;
+ static const int32_t kErrConsumeGroupForbidden = 450;
+ static const int32_t kErrConsumeSpeedLimit = 452;
+ static const int32_t kErrConsumeContentForbidden = 455;
+
+ static const int32_t kErrServerError = 500;
+ static const int32_t kErrServiceUnavilable = 503;
+ static const int32_t kErrServerMsgsetNullError = 510;
+ static const int32_t kErrWaitServerRspTimeout = 550;
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_CONST_ERR_CODE_H_
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
index 98d8be4..0f05bb3 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
@@ -91,7 +91,7 @@ class FlowCtrlRuleHandler {
~FlowCtrlRuleHandler();
void UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id, int64_t flowctrl_id,
const string& flowctrl_info);
- bool GetCurDataLimit(int32_t last_datadlt, FlowCtrlResult& flowctrl_result);
+ bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result);
int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit);
int32_t GetMinZeroCnt() { return this->min_zero_cnt_.Get(); }
int32_t GetQryPriorityId() { return this->qrypriority_id_.Get(); }
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
index 945b59e..8d68201 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
@@ -45,7 +45,7 @@ class Message {
void SetTopic(const string& topic);
const char* GetData() const;
uint32_t GetDataLength() const;
- void setData(const char* data, int datalen);
+ void setData(const char* data, uint32_t datalen);
const int32_t GetFlag() const;
void SetFlag(int32_t flag);
const map<string, string>& GetProperties() const;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index e99939d..6dcd324 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -141,6 +141,39 @@ class ConsumerEvent {
class PartitionExt : public Partition {
public:
PartitionExt();
+ PartitionExt(const string& partition_info);
+ PartitionExt(const NodeInfo& broker_info, const string& part_str);
+ ~PartitionExt();
+ void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
+ int64_t rsp_dlt_limit, long last_datadlt, bool require_slow);
+ int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed);
+ int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed,
+ int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit,
+ int64_t last_datadlt, bool require_slow);
+ void SetLastConsumed(bool last_consumed);
+ bool IsLastConsumed();
+ private:
+ void resetParameters();
+
+ private:
+ bool is_last_consumed_;
+ FlowCtrlResult cur_flowctrl_;
+ FlowCtrlItem cur_freqctrl_;
+ int64_t next_stage_updtime_;
+ int64_t next_slice_updtime_;
+ int64_t limit_slice_msgsize_;
+ int64_t cur_stage_msgsize_;
+ int64_t cur_slice_msgsize_;
+ int32_t total_zero_cnt_;
+ int64_t booked_time_;
+ int32_t booked_errcode_;
+ bool booked_esc_limit_;
+ int32_t booked_msgsize_;
+ int64_t booked_dlt_limit_;
+ int64_t booked_curdata_dlt_;
+ bool booked_require_slow_;
};
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
index 6c1f757..c6da13d 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
@@ -28,6 +28,11 @@
namespace tubemq {
+using std::stringstream;
+using std::set;
+using std::vector;
+
+
BaseConfig::BaseConfig() {
this->master_addrinfo_ = "";
this->auth_enable_ = false;
@@ -391,11 +396,6 @@ bool ConsumerConfig::setGroupConsumeTarget(
}
return false;
}
- // check source_count
- if (source_count <= 0) {
- err_info = "Illegal parameter: source_count must over zero!";
- return false;
- }
// check part_offset_map
string part_key;
map<string, int64_t> tmp_parts_map;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
index fe8edcd..97cf806 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
@@ -29,6 +29,10 @@
namespace tubemq {
+using std::ifstream;
+
+
+
Fileini::Fileini() {
this->init_flag_ = false;
this->ini_map_.clear();
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
index 1975306..27caba8 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
@@ -31,6 +31,8 @@
namespace tubemq {
+using std::stringstream;
+
FlowCtrlResult::FlowCtrlResult() {
this->datasize_limit_ = config::kMaxIntValue;
this->freqms_limit_ = 0;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
index 7fb83f8..d0afacf 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
@@ -28,6 +28,9 @@
namespace tubemq {
+using std::stringstream;
+
+
// message flag's properties settings
static const int32_t kMsgFlagIncProperties = 0x01;
// reserved property key Filter Item
@@ -73,7 +76,7 @@ Message& Message::operator=(const Message& target) {
return *this;
}
-const uint64_t Message::GetMessageId() const { return this->message_id_; }
+const int64_t Message::GetMessageId() const { return this->message_id_; }
void Message::SetMessageId(int64_t message_id) { this->message_id_ = message_id; }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index c1f428c..81ecf79 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -25,11 +25,12 @@
#include <vector>
#include "const_config.h"
+#include "const_errcode.h"
#include "utils.h"
namespace tubemq {
-using std::sstream;
+using std::stringstream;
using std::vector;
NodeInfo::NodeInfo() {
@@ -381,4 +382,149 @@ string ConsumerEvent::ToString() {
return ss.str();
}
+
+PartitionExt::PartitionExt() : Partition() {
+ resetParameters();
+}
+
+PartitionExt::PartitionExt(const string& partition_info) : Partition(partition_info) {
+ resetParameters();
+}
+
+PartitionExt::PartitionExt(const NodeInfo& broker_info, const string& part_str)
+ : Partition(broker_info, part_str) {
+ resetParameters();
+}
+
+PartitionExt::~PartitionExt() {
+ //
+}
+
+void PartitionExt::BookConsumeData(int32_t errcode, int32_t msg_size,
+ bool req_esc_limit, int64_t rsp_dlt_limit, long last_datadlt, bool require_slow) {
+ this->booked_time_ =Utils::GetCurrentTimeMillis();
+ this->booked_errcode_ = errcode;
+ this->booked_esc_limit_= req_esc_limit;
+ this->booked_msgsize_ = msg_size;
+ this->booked_dlt_limit_ = rsp_dlt_limit;
+ this->booked_curdata_dlt_ = last_datadlt;
+ this->booked_require_slow_ = require_slow;
+}
+
+int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed) {
+ int64_t dlt_time = Utils::GetCurrentTimeMillis() - this->booked_time_;
+ return ProcConsumeResult(def_flowctrl_handler, group_flowctrl_handler, filter_consume,
+ last_consumed, this->booked_errcode_, this->booked_msgsize_, this->booked_esc_limit_,
+ this->booked_dlt_limit_, this->booked_curdata_dlt_, this->booked_require_slow_) - dlt_time;
+}
+
+int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed,
+ int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit,
+ int64_t last_datadlt, bool require_slow) {
+ bool result = false;
+ // Accumulated data received
+ this->_isLastConsumed = last_consumed;
+ this->cur_stage_msgsize_ += msg_size;
+ this->cur_slice_msgsize_ += msg_size;
+ // Update strategy data values
+ int64_t curr_time = Utils::GetCurrentTimeMillis();
+ if (curr_time - this->next_stage_updtime_) {
+ this->cur_stage_msgsize_ = 0;
+ this->cur_slice_msgsize_ = 0;
+ if (last_datadlt >= 0) {
+ result = group_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+ if (!result) {
+ result = def_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+ if (!result) {
+ this->cur_flowctrl_.SetDataDltAndFreqLimit(config::kMaxLongValue, 0);
+ }
+ }
+ this->cur_freqctrl_ = group_flowctrl_handler.GetFilterCtrlItem();
+ if (this->cur_freqctrl_.getFreqLtInMs() < 0) {
+ this->cur_freqctrl_ = def_flowctrl_handler.GetFilterCtrlItem();
+ }
+ curr_time = Utils::GetCurrentTimeMillis();
+ }
+ this->limit_slice_msgsize_ = this->cur_flowctrl_.GetDataSizeLimit() / 12;
+ this->next_stage_updtime_ = curr_time + 60000;
+ this->next_slice_updtime_ = curr_time + 5000;
+ } else if(curr_time > this->next_slice_updtime_) {
+ this->cur_slice_msgsize_ = 0;
+ this->next_slice_updtime_ = curr_time + 5000;
+ }
+ // Perform different strategies based on error codes
+ switch (errcode) {
+ case err_code::kErrNotFound:
+ case err_code::kErrSuccess:
+ if (msg_size == 0 && errcode != err_code::kErrSuccess) {
+ this->total_zero_cnt_ += 1;
+ } else {
+ this->total_zero_cnt_ = 0;
+ }
+ if (this->total_zero_cnt_ > 0) {
+ if (group_flowctrl_handler.GetMinZeroCnt() != config::kMaxIntValue) {
+ return (int64_t)group_flowctrl_handler.GetCurFreqLimitTime(
+ this->total_zero_cnt_, (int32_t)rsp_dlt_limit);
+ } else {
+ return (int64_t)def_flowctrl_handler.GetCurFreqLimitTime(
+ this->_totalRcvZeroCount, (int32_t)rsp_dlt_limit);
+ }
+ }
+ if (req_esc_limit) {
+ return 0;
+ } else {
+ if (this->cur_stage_msgsize_ >= this->cur_flowctrl_.GetDataSizeLimit()
+ || this->cur_slice_msgsize_ >= this->limit_slice_msgsize_) {
+ return this->cur_flowctrl_.GetFreqMsLimit() > rsp_dlt_limit
+ ? this->cur_flowctrl_.GetFreqMsLimit() : rsp_dlt_limit;
+ }
+ if (errcode == err_code::kErrSuccess) {
+ if (filter_consume && this->cur_freqctrl_.GetFreqMsLimit() >= 0) {
+ if (require_slow) {
+ return this->cur_freqctrl_.GetZeroCnt();
+ } else {
+ return this->cur_freqctrl_.GetFreqMsLimit();
+ }
+ } else if (!filter_consume && this->cur_freqctrl_.GetDataSizeLimit() >=0) {
+ return this->cur_freqctrl_.GetDataSizeLimit();
+ }
+ }
+ return rsp_dlt_limit;
+ }
+ break;
+
+ default:
+ return rsp_dlt_limit;
+ }
+}
+
+void PartitionExt::SetLastConsumed(bool last_consumed) {
+ this->is_last_consumed_ = last_consumed;
+}
+
+bool PartitionExt::IsLastConsumed() {
+ return this->is_last_consumed_;
+}
+
+void PartitionExt::resetParameters() {
+ this->is_last_consumed_ = false;
+ this->cur_flowctrl_.SetDataDltAndFreqLimit(config::kMaxLongValue, 20);
+ this->next_stage_updtime_ = 0;
+ this->next_slice_updtime_ = 0;
+ this->limit_slice_msgsize_ = 0;
+ this->cur_stage_msgsize_ = 0;
+ this->cur_slice_msgsize_ = 0;
+ this->total_zero_cnt_ = 0;
+ this->booked_time_ = 0;
+ this->booked_errcode_ = 0;
+ this->booked_esc_limit_= false;
+ this->booked_msgsize_ = 0;
+ this->booked_dlt_limit_ = 0;
+ this->booked_curdata_dlt_ = 0;
+ this->booked_require_slow_ = false;
+}
+
+
}; // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index 2fba193..bbfba96 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -31,6 +31,9 @@
namespace tubemq {
+using std::stringstream;
+
+
static const string kWhitespaceCharSet = " \n\r\t\f\v";
string Utils::Trim(const string& source) {