You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/02 08:56:59 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-262]Create C++ flow control handler
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new 9672cf1 [TUBEMQ-262]Create C++ flow control handler
9672cf1 is described below
commit 9672cf128b28c2b1070ed639b0b24e01c7a66018
Author: gosonzhang <go...@tencent.com>
AuthorDate: Thu Jul 2 16:56:38 2020 +0800
[TUBEMQ-262]Create C++ flow control handler
---
.../tubemq-client-cpp/inc/atomic_def.h | 277 +++++++++++++++++++++
.../tubemq-client-cpp/inc/const_config.h | 9 +
tubemq-client-twins/tubemq-client-cpp/inc/utils.h | 1 +
.../tubemq-client-cpp/src/client_config.cc | 47 ++--
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 6 +
5 files changed, 317 insertions(+), 23 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h b/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h
new file mode 100644
index 0000000..24df5f3
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_CLIENT_ATOMIC_DEF_H_
+#define _TUBEMQ_CLIENT_ATOMIC_DEF_H_
+
+#include <stdlib.h>
+
+
+
+namespace tubemq {
+
+using namespace std;
+
+class AtomicInteger {
+ public:
+ AtomicInteger(){
+ this->counter_ = 0;
+ }
+
+ AtomicInteger(int initial_value) {
+ this->counter_ = initial_value;
+ }
+
+ int Get() {
+ return this->counter_;
+ }
+
+ void Set(long new_value) {
+ this->counter_ = new_value;
+ }
+
+ long LongValue() {
+ return (long)this->counter_;
+ }
+
+ int GetAndSet(int new_value) {
+ for ( ; ; ) {
+ int current = this->counter_;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, new_value)) {
+ return current;
+ }
+ }
+ }
+
+ bool CompareAndSet(int expect, int update) {
+ return __sync_bool_compare_and_swap(&this->counter_, expect, update);
+ }
+
+ int GetAndIncrement() {
+ for ( ; ; ) {
+ int current = this->counter_;
+ int next = current + 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return current;
+ }
+ }
+ }
+
+ int GetAndDecrement() {
+ for ( ; ; ) {
+ int current = this->counter_;
+ int next = current - 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return current;
+ }
+ }
+ }
+
+ int GetAndAdd(int delta) {
+ for ( ; ; ) {
+ int current = this->counter_;
+ int next = current + delta;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return current;
+ }
+ }
+ }
+
+ int IncrementAndGet() {
+ for ( ; ; ) {
+ int current = this->counter_;
+ int next = current + 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return next;
+ }
+ }
+ }
+
+ int DecrementAndGet() {
+ for ( ; ; ) {
+ int current = this->counter_;
+ int next = current - 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return next;
+ }
+ }
+ }
+
+ int AddAndGet(int delta) {
+ for ( ; ; ) {
+ int current = this->counter_;
+ int next = current + delta;
+ if (__sync_bool_compare_and_swap (&this->counter_, current, next)) {
+ return next;
+ }
+ }
+ }
+
+ private:
+ volatile int counter_;
+};
+
+
+class AtomicLong {
+ public:
+ AtomicLong() {
+ this->counter_ = 0;
+ }
+
+ AtomicLong(long initial_value) {
+ this->counter_ = initial_value;
+ }
+
+ long Get() {
+ return this->counter_;
+ }
+
+ void Set(long new_value) {
+ this->counter_ = new_value;
+ }
+
+ long IntValue() {
+ return (int)this->counter_;
+ }
+
+ long GetAndSet(long new_value) {
+ for ( ; ; ) {
+ long current = this->counter_;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, new_value)) {
+ return current;
+ }
+ }
+ }
+
+ bool CompareAndSet(long expect, long update) {
+ return __sync_bool_compare_and_swap(&this->counter_, expect, update);
+ }
+
+ long GetAndIncrement() {
+ for ( ; ; ) {
+ long current = this->counter_;
+ long next = current + 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return current;
+ }
+ }
+ }
+
+ long GetAndDecrement() {
+ for ( ; ; ) {
+ long current = this->counter_;
+ long next = current - 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return current;
+ }
+ }
+ }
+
+ long GetAndAdd(long delta) {
+ for ( ; ; ) {
+ long current = this->counter_;
+ long next = current + delta;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return current;
+ }
+ }
+ }
+
+ long IncrementAndGet() {
+ for ( ; ; ) {
+ long current = this->counter_;
+ long next = current + 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return next;
+ }
+ }
+ }
+
+ long DecrementAndGet() {
+ for ( ; ; ) {
+ long current = this->counter_;
+ long next = current - 1;
+ if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+ return next;
+ }
+ }
+ }
+
+ long AddAndGet(long delta) {
+ for ( ; ; ) {
+ long current = this->counter_;
+ long next = current + delta;
+ if (__sync_bool_compare_and_swap (&this->counter_, current, next)) {
+ return next;
+ }
+ }
+ }
+
+ private:
+ volatile long counter_;
+};
+
+
+class AtomicBoolean{
+ public:
+ AtomicBoolean() {
+ this->counter_ = 0;
+ }
+
+ AtomicBoolean(bool initial_value) {
+ this->counter_ = initial_value ? 1 : 0;
+ }
+
+ bool Get() {
+ return this->counter_ != 0;
+ }
+
+ void Set(bool new_value) {
+ this->counter_ = new_value ? 1 : 0;
+ }
+
+ bool GetAndSet(bool new_value) {
+ int u = new_value ? 1 : 0;
+ for (;;) {
+ int e = this->counter_ ? 1 : 0;
+ if (__sync_bool_compare_and_swap(&this->counter_, e, u)) {
+ return e != 0;
+ }
+ }
+ }
+
+ bool CompareAndSet(bool expect, bool update) {
+ int e = expect ? 1 : 0;
+ int u = update ? 1 : 0;
+ return __sync_bool_compare_and_swap(&this->counter_, e, u);
+ }
+
+ private:
+ volatile int counter_;
+};
+
+
+
+
+
+}
+
+
+#endif
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h b/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h
index 1f2e655..5ec1709 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h
@@ -63,6 +63,15 @@ static const int kConfirmWaitPeriodMsMax = 60000;
// default rebalance wait if shutdown meeting
static const int kRebWaitPeriodWhenShutdownMs = 10000;
+// max int value
+static const int kMaxIntValue = 0x7fffffff;
+// max long value
+static const int kMaxLongValue = 0x7fffffffffffffffL;
+
+// invalid value
+static const int kInvalidValue = -2;
+
+
} // namespace config
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
index 9326449..de39683 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
@@ -47,6 +47,7 @@ class Utils {
const string& group_name, string& tgt_group_name);
static bool ValidFilterItem(string& err_info,
const string& src_filteritem, string& tgt_filteritem);
+ static long GetCurrentTimeMillis();
};
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
index 84d6721..f7fde92 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
@@ -344,26 +344,27 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
return false;
}
map<string, set<string> > tmp_sub_map;
- map<string, set<string> >::const_iterator itMap;
- for (itMap = subscribed_topic_and_filter_map.begin(); itMap != subscribed_topic_and_filter_map.end(); ++itMap) {
+ map<string, set<string> >::const_iterator it_map;
+ for (it_map = subscribed_topic_and_filter_map.begin();
+ it_map != subscribed_topic_and_filter_map.end(); ++it_map) {
int count=0;
string tmp_filteritem;
set<string> tgt_filters;
// check topic_name info
- is_success = Utils::ValidString(err_info, itMap->first,
+ is_success = Utils::ValidString(err_info, it_map->first,
false, true, true, config::kTopicNameMaxLength);
if (!is_success) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
- ss << itMap->first;
+ ss << it_map->first;
ss << " ";
ss << err_info;
err_info = ss.str();
return false;
}
- string topic_name = Utils::Trim(itMap->first);
+ string topic_name = Utils::Trim(it_map->first);
// check filter info
- set<string> subscribed_filters = itMap->second;
+ set<string> subscribed_filters = it_map->second;
for (set<string>::iterator it = subscribed_filters.begin(); it != subscribed_filters.end(); ++it) {
is_success = Utils::ValidFilterItem(err_info, *it, tmp_filteritem);
if (!is_success) {
@@ -381,7 +382,7 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
if (count > config::kFilterItemMaxCount) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
- ss << itMap->first;
+ ss << it_map->first;
ss << "'s filter item over max item count : ";
ss << config::kFilterItemMaxCount;
err_info = ss.str();
@@ -419,14 +420,14 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
// check part_offset_map
string part_key;
map<string, long> tmp_parts_map;
- map<string, long>::const_iterator itPart;
- for (itPart = part_offset_map.begin(); itPart != part_offset_map.end(); ++itPart) {
+ map<string, long>::const_iterator it_part;
+ for (itPart = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) {
vector<string> result;
- Utils::Split(itPart->first, result, delimiter::kDelimiterColon);
+ Utils::Split(it_part->first, result, delimiter::kDelimiterColon);
if (result.size() != 3) {
stringstream ss;
ss << "Illegal parameter: part_offset_map's key ";
- ss << itPart->first;
+ ss << it_part->first;
ss << " format error, value must be aaaa:bbbb:cccc !";
err_info = ss.str();
return false;
@@ -434,34 +435,34 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
if (tmp_sub_map.find(result[1]) != tmp_sub_map.end()) {
stringstream ss;
ss << "Illegal parameter: ";
- ss << itPart->first;
+ ss << it_part->first;
ss << " subscribed topic ";
ss << result[1];
ss << " not included in subscribed_topic_and_filter_map's topic list!";
err_info = ss.str();
return false;
}
- if (itPart->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
+ if (it_part->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
stringstream ss;
ss << "Illegal parameter: key ";
- ss << itPart->first;
+ ss << it_part->first;
ss << " include ";
ss << delimiter::kDelimiterComma;
ss << " char!";
err_info = ss.str();
return false;
}
- if (itPart->second < 0) {
+ if (it_part->second < 0) {
stringstream ss;
ss << "Illegal parameter: ";
- ss << itPart->first;
+ ss << it_part->first;
ss << "'s offset must over or equal zero, value is ";
- ss << itPart->second;
+ ss << it_part->second;
err_info = ss.str();
return false;
}
Utils::Join(result, delimiter::kDelimiterColon, part_key);
- tmp_parts_map[part_key] = itPart->second;
+ tmp_parts_map[part_key] = it_part->second;
}
// set verified data
this->is_bound_consume_ = true;
@@ -551,7 +552,7 @@ string ConsumerConfig::ToString() {
int i = 0;
stringstream ss;
map<string, long>::iterator it;
- map<string, set<string> >::iterator itMap;
+ map<string, set<string> >::iterator it_map;
// print info
ss << "ConsumerConfig = {";
@@ -559,8 +560,8 @@ string ConsumerConfig::ToString() {
ss << ", group_name_='";
ss << this->group_name_;
ss << "', sub_topic_and_filter_map_={";
- for (itMap = this->sub_topic_and_filter_map_.begin();
- itMap != this->sub_topic_and_filter_map_.end(); ++itMap) {
+ for (it_map = this->sub_topic_and_filter_map_.begin();
+ it_map != this->sub_topic_and_filter_map_.end(); ++it_map) {
if (i++ > 0) {
ss << ",";
}
@@ -568,8 +569,8 @@ string ConsumerConfig::ToString() {
ss << itMap->first;
ss << "'=[";
int j=0;
- set<string> topicSet = itMap->second;
- for (set<string>::iterator it = topicSet.begin(); it != topicSet.end(); ++it) {
+ set<string> topic_set = it_map->second;
+ for (set<string>::iterator it = topic_set.begin(); it != topic_set.end(); ++it) {
if (j++ > 0) {
ss << ",";
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index f9a1b1e..f8ee3c2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -237,6 +237,12 @@ bool Utils::ValidFilterItem(string& err_info,
}
+long Utils::GetCurrentTimeMillis() {
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ return tv.tv_sec * 1000 + tv.tv_usec /1000;
+}
+
}