You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/16 06:08:57 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-282]Create C/C++ return result class (#207)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new 5d4e376 [TUBEMQ-282]Create C/C++ return result class (#207)
5d4e376 is described below
commit 5d4e37647a083f8505c33f9dfae894e4f86f6021
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jul 16 06:08:42 2020 +0000
[TUBEMQ-282]Create C/C++ return result class (#207)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/const_config.h | 22 ++--
.../include/tubemq/executor_pool.h | 3 +-
.../include/tubemq/rmt_data_cache.h | 3 +-
.../include/tubemq/tubemq_return.h | 95 ++++++++++++++
.../tubemq-client-cpp/include/tubemq/version.h | 2 +-
.../tubemq-client-cpp/src/client_subinfo.cc | 4 +-
.../tubemq-client-cpp/src/file_ini.cc | 14 +-
.../tubemq-client-cpp/src/message.cc | 4 +-
.../tubemq-client-cpp/src/meta_info.cc | 2 +-
.../tubemq-client-cpp/src/rmt_data_cache.cc | 19 ++-
.../tubemq-client-cpp/src/tubemq_return.cc | 143 +++++++++++++++++++++
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 2 +-
12 files changed, 276 insertions(+), 37 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h
index bc90194..d843929 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_config.h
@@ -76,20 +76,20 @@ static const int32_t kInvalidValue = -2;
} // namespace tb_config
namespace delimiter {
-static const string kDelimiterDot = ".";
-static const string kDelimiterEqual = "=";
-static const string kDelimiterAnd = "&";
-static const string kDelimiterComma = ",";
-static const string kDelimiterColon = ":";
-static const string kDelimiterAt = "@";
-static const string kDelimiterPound = "#";
-static const string kDelimiterSemicolon = ";";
+static const char kDelimiterDot[] = ".";
+static const char kDelimiterEqual[] = "=";
+static const char kDelimiterAnd[] = "&";
+static const char kDelimiterComma[] = ",";
+static const char kDelimiterColon[] = ":";
+static const char kDelimiterAt[] = "@";
+static const char kDelimiterPound[] = "#";
+static const char kDelimiterSemicolon[] = ";";
// Double slash
-static const string kDelimiterDbSlash = "//";
+static const char kDelimiterDbSlash[] = "//";
// left square bracket
-static const string kDelimiterLftSB = "[";
+static const char kDelimiterLftSB[] = "[";
// right square bracket
-static const string kDelimiterRgtSB = "]";
+static const char kDelimiterRgtSB[] = "]";
} // namespace delimiter
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
index a5208a4..404616b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
@@ -85,6 +85,7 @@ class ExecutorPool : noncopyable {
};
typedef std::shared_ptr<ExecutorPool> ExecutorPoolPtr;
+
} // namespace tubemq
-#endif //_TUBEMQ_EXECUTOR_POOL_
+#endif // _TUBEMQ_EXECUTOR_POOL_
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index 2b79f13..aa8f250 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -48,7 +48,7 @@ class RmtDataCacheCsm {
public:
RmtDataCacheCsm(const string& client_id, const string& group_name);
~RmtDataCacheCsm();
- void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
+ void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
const string& flowctrl_info);
void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
int64_t flowctrl_id, const string& flowctrl_info);
@@ -109,7 +109,6 @@ class RmtDataCacheCsm {
FlowCtrlRuleHandler def_flowctrl_handler_;
AtomicBoolean under_groupctrl_;
AtomicLong last_checktime_;
-
// meta info
pthread_rwlock_t meta_rw_lock_;
// partiton allocated map
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
new file mode 100644
index 0000000..96194b4
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_return.h
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_RETURN_H_
+#define TUBEMQ_CLIENT_RETURN_H_
+
+#include <stdlib.h>
+
+#include <list>
+#include <string>
+
+#include "tubemq/message.h"
+#include "tubemq/meta_info.h"
+
+
+
+
+namespace tubemq {
+
+using std::string;
+
+
+class PeerInfo {
+ public:
+ PeerInfo();
+ PeerInfo(const Partition& partition, int64_t offset);
+ PeerInfo& operator=(const PeerInfo& target);
+ void SetMsgSourceInfo(const Partition& partition, int64_t offset);
+ const uint32_t GetPartitionId() const { return partition_id_; }
+ const string& GetBrokerHost() const { return broker_host_; }
+ const string& GetPartitionKey() const { return partition_key_; }
+ const int64_t GetCurrOffset() const { return curr_offset_; }
+
+ private:
+ uint32_t partition_id_;
+ string broker_host_;
+ string partition_key_;
+ int64_t curr_offset_;
+};
+
+
+
+class ConsumerResult {
+ public:
+ ConsumerResult();
+ ConsumerResult(const ConsumerResult& target);
+ ConsumerResult(int32_t err_code, string err_msg);
+ ~ConsumerResult();
+ ConsumerResult& operator=(const ConsumerResult& target);
+ void SetFailureResult(int32_t err_code, string err_msg);
+ void SetFailureResult(int32_t err_code, string err_msg,
+ const string& topic_name, const PeerInfo& peer_info);
+ void SetSuccessResult(int32_t err_code, const string& topic_name,
+ const PeerInfo& peer_info, const string& confirm_context,
+ const list<Message>& message_list);
+ bool IsSuccess() { return success_; }
+ const int32_t GetErrCode() const { return err_code_; }
+ const string& GetErrMessage() const { return err_msg_; }
+ const string& GetTopicName() const { return topic_name_; }
+ const PeerInfo& GetPeerInfo() const { return peer_info_; }
+ const string& GetConfirmContext() const { return confirm_context_; }
+ const list<Message>& GetMessageList() const { return message_list_; }
+ const string& GetPartitionKey() const;
+ const int64_t GetCurrOffset() const;
+
+ private:
+ bool success_;
+ int32_t err_code_;
+ string err_msg_;
+ string topic_name_;
+ PeerInfo peer_info_;
+ string confirm_context_;
+ list<Message> message_list_;
+};
+
+}
+
+#endif // TUBEMQ_CLIENT_RETURN_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h
index 4aede43..c479ede 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/version.h
@@ -26,7 +26,7 @@ namespace tubemq {
using std::string;
-static const string kTubeMQClientVersion = "0.1.0-0.5.0";
+static const char kTubeMQClientVersion[] = "0.1.0-0.5.0";
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
index 0c8064c..9e959e6 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -62,7 +62,7 @@ void ClientSubInfo::SetConsumeTarget(bool bound_consume,
} else {
topic_filter_map_[it_topic->first] = true;
- //build topic conditions
+ // build topic conditions
count = 0;
tmpstr = it_topic->first;
tmpstr += delimiter::kDelimiterPound;
@@ -77,7 +77,7 @@ void ClientSubInfo::SetConsumeTarget(bool bound_consume,
}
}
- //build bound_partition info
+ // build bound_partition info
if (bound_consume) {
session_key_ = session_key;
source_count_ = source_count;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
index 346dc10..b073cde 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
@@ -59,6 +59,9 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
string sector = "";
string key = "";
string value = "";
+ string lftsb = delimiter::kDelimiterLftSB;
+ string rgtsb = delimiter::kDelimiterRgtSB;
+ string equal = delimiter::kDelimiterEqual;
string::size_type lftsb_pos = 0;
string::size_type rgtsb_pos = 0;
string::size_type equal_pos = 0;
@@ -71,21 +74,20 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
continue;
}
// check if a sector head
- lftsb_pos = line_str.find(delimiter::kDelimiterLftSB);
- rgtsb_pos = line_str.find(delimiter::kDelimiterRgtSB);
+ lftsb_pos = line_str.find(lftsb);
+ rgtsb_pos = line_str.find(rgtsb);
if (lftsb_pos != string::npos && rgtsb_pos != string::npos) {
- sector = line_str.substr(lftsb_pos + (delimiter::kDelimiterLftSB).size(),
- rgtsb_pos - (delimiter::kDelimiterRgtSB).size());
+ sector = line_str.substr(lftsb_pos + lftsb.size(), rgtsb_pos - rgtsb.size());
sector = Utils::Trim(sector);
continue;
}
// check if a key=value string
- equal_pos = line_str.find(delimiter::kDelimiterEqual);
+ equal_pos = line_str.find(equal);
if (equal_pos == string::npos) {
continue;
}
key = line_str.substr(0, equal_pos);
- value = line_str.substr(equal_pos + (delimiter::kDelimiterEqual).size(), line_str.size());
+ value = line_str.substr(equal_pos + equal.size(), line_str.size());
key = Utils::Trim(key);
value = Utils::Trim(value);
// get data from file to memory
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
index 9d117cd..93e3dd2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
@@ -34,9 +34,9 @@ using std::stringstream;
// message flag's properties settings
static const int32_t kMsgFlagIncProperties = 0x01;
// reserved property key Filter Item
-static const string kRsvPropKeyFilterItem = "$msgType$";
+static const char kRsvPropKeyFilterItem[] = "$msgType$";
// reserved property key message send time
-static const string kRsvPropKeyMsgTime = "$msgTime$";
+static const char kRsvPropKeyMsgTime[] = "$msgTime$";
Message::Message() {
this->topic_ = "";
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 7694c5b..ae7d72e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -265,7 +265,7 @@ PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
if (this != &target) {
// parent class
Partition::operator=(target);
- // child class
+ // child class
this->is_last_consumed_ = target.is_last_consumed_;
this->cur_flowctrl_ = target.cur_flowctrl_;
this->cur_freqctrl_ = target.cur_freqctrl_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index 2164fa9..ec4876b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -19,9 +19,8 @@
#include "tubemq/rmt_data_cache.h"
-#include <string>
-
#include <stdlib.h>
+#include <string>
#include "tubemq/const_config.h"
#include "tubemq/meta_info.h"
@@ -57,24 +56,24 @@ RmtDataCacheCsm::~RmtDataCacheCsm() {
pthread_rwlock_destroy(&meta_rw_lock_);
}
+
void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
const string& flowctrl_info) {
if (flowctrl_id != def_flowctrl_handler_.GetFlowCtrlId()) {
- def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true,
+ def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true,
tb_config::kInvalidValue, flowctrl_id, flowctrl_info);
}
}
void RmtDataCacheCsm::UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
int64_t flowctrl_id, const string& flowctrl_info) {
if (flowctrl_id != group_flowctrl_handler_.GetFlowCtrlId()) {
- group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false,
+ group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false,
qyrpriority_id, flowctrl_id, flowctrl_info);
}
if (qyrpriority_id != group_flowctrl_handler_.GetQryPriorityId()) {
this->group_flowctrl_handler_.SetQryPriorityId(qyrpriority_id);
-
}
- // update current if under group flowctrl
+ // update current if under group flowctrl
int64_t cur_time = Utils::GetCurrentTimeMillis();
if (cur_time - last_checktime_.Get() > 10000) {
FlowCtrlResult flowctrl_result;
@@ -176,9 +175,9 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
return result;
}
-void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
int64_t curr_offset, int32_t err_code, bool esc_limit,
- int32_t msg_size,int64_t limit_dlt, int64_t cur_data_dlt,
+ int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt,
bool require_slow) {
map<string, PartitionExt>::iterator it_part;
// book partition offset info
@@ -287,7 +286,7 @@ void RmtDataCacheCsm::GetAllBrokerPartitions(
bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& partition_ext) {
bool result = false;
map<string, PartitionExt>::iterator it_map;
-
+
pthread_rwlock_rdlock(&meta_rw_lock_);
it_map = partitions_.find(part_key);
if (it_map != partitions_.end()) {
@@ -314,7 +313,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
set<string>::iterator it_key;
map<NodeInfo, set<string> >::iterator it_broker;
map<string, PartitionExt>::iterator it_part;
-
+
partition_list.clear();
pthread_rwlock_rdlock(&meta_rw_lock_);
it_broker = broker_partition_.find(broker_info);
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
new file mode 100644
index 0000000..5298606
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/tubemq_return.h"
+#include "tubemq/const_config.h"
+
+
+
+namespace tubemq {
+
+
+PeerInfo::PeerInfo() {
+ broker_host_ = "";
+ partition_key_ = "";
+ partition_id_ = 0;
+ curr_offset_ = tb_config::kInvalidValue;
+}
+
+PeerInfo::PeerInfo(const Partition& partition, int64_t offset) {
+ SetMsgSourceInfo(partition, offset);
+}
+
+PeerInfo& PeerInfo::operator=(const PeerInfo& target) {
+ if (this != &target) {
+ this->partition_id_ = target.partition_id_;
+ this->broker_host_ = target.broker_host_;
+ this->partition_key_ = target.partition_key_;
+ this->curr_offset_ = target.curr_offset_;
+ }
+ return *this;
+}
+
+void PeerInfo::SetMsgSourceInfo(const Partition& partition, int64_t offset) {
+ partition_id_ = partition.GetPartitionId();
+ broker_host_ = partition.GetBrokerHost();
+ partition_key_ = partition.GetPartitionKey();
+ curr_offset_ = offset;
+}
+
+ConsumerResult::ConsumerResult() {
+ success_ = false;
+ err_code_ = tb_config::kInvalidValue;
+ err_msg_ = "";
+ topic_name_ = "";
+ confirm_context_ = "";
+}
+
+ConsumerResult::ConsumerResult(const ConsumerResult& target) {
+ this->success_ = target.success_;
+ this->err_code_ = target.err_code_;
+ this->err_msg_ = target.err_msg_;
+ this->topic_name_ = target.topic_name_;
+ this->peer_info_ = target.peer_info_;
+ this->confirm_context_ = target.confirm_context_;
+ this->message_list_ = target.message_list_;
+}
+
+ConsumerResult::ConsumerResult(int32_t err_code, string err_msg) {
+ success_ = false;
+ err_code_ = err_code;
+ err_msg_ = err_msg;
+ topic_name_ = "";
+ confirm_context_ = "";
+}
+
+ConsumerResult::~ConsumerResult() {
+ this->message_list_.clear();
+ success_ = false;
+ err_code_ = tb_config::kInvalidValue;
+ err_msg_ = "";
+ topic_name_ = "";
+ confirm_context_ = "";
+}
+
+ConsumerResult& ConsumerResult::operator=(const ConsumerResult& target) {
+ if (this != &target) {
+ this->success_ = target.success_;
+ this->err_code_ = target.err_code_;
+ this->err_msg_ = target.err_msg_;
+ this->topic_name_ = target.topic_name_;
+ this->peer_info_ = target.peer_info_;
+ this->confirm_context_ = target.confirm_context_;
+ this->message_list_ = target.message_list_;
+ }
+ return *this;
+}
+
+void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg) {
+ success_ = false;
+ err_code_ = err_code;
+ err_msg_ = err_msg;
+}
+
+void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg,
+ const string& topic_name, const PeerInfo& peer_info) {
+ success_ = false;
+ err_code_ = err_code;
+ err_msg_ = err_msg;
+ topic_name_ = topic_name;
+ peer_info_ = peer_info;
+}
+
+void ConsumerResult::SetSuccessResult(int32_t err_code,
+ const string& topic_name,
+ const PeerInfo& peer_info,
+ const string& confirm_context,
+ const list<Message>& message_list) {
+ this->success_ = true;
+ this->err_code_ = err_code;
+ this->err_msg_ = "Ok";
+ this->topic_name_ = topic_name;
+ this->peer_info_ = peer_info;
+ this->confirm_context_ = confirm_context;
+ this->message_list_ = message_list;
+}
+
+const string& ConsumerResult::GetPartitionKey() const {
+ return this->peer_info_.GetPartitionKey();
+}
+
+const int64_t ConsumerResult::GetCurrOffset() const {
+ return this->peer_info_.GetCurrOffset();
+}
+
+
+} // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index e48c8b6..40d1de5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -34,7 +34,7 @@ namespace tubemq {
using std::stringstream;
-static const string kWhitespaceCharSet = " \n\r\t\f\v";
+static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
string Utils::Trim(const string& source) {
string target = source;