You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/02 08:56:59 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-262]Create C++ flow control handler

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 9672cf1  [TUBEMQ-262]Create C++ flow control handler
9672cf1 is described below

commit 9672cf128b28c2b1070ed639b0b24e01c7a66018
Author: gosonzhang <go...@tencent.com>
AuthorDate: Thu Jul 2 16:56:38 2020 +0800

    [TUBEMQ-262]Create C++ flow control handler
---
 .../tubemq-client-cpp/inc/atomic_def.h             | 277 +++++++++++++++++++++
 .../tubemq-client-cpp/inc/const_config.h           |   9 +
 tubemq-client-twins/tubemq-client-cpp/inc/utils.h  |   1 +
 .../tubemq-client-cpp/src/client_config.cc         |  47 ++--
 tubemq-client-twins/tubemq-client-cpp/src/utils.cc |   6 +
 5 files changed, 317 insertions(+), 23 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h b/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h
new file mode 100644
index 0000000..24df5f3
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_CLIENT_ATOMIC_DEF_H_
+#define _TUBEMQ_CLIENT_ATOMIC_DEF_H_
+
+#include <stdlib.h>
+
+
+
+namespace tubemq {
+
+using namespace std;
+
+class AtomicInteger {
+ public:
+  AtomicInteger(){
+    this->counter_ = 0;
+  }
+            
+  AtomicInteger(int initial_value) {
+    this->counter_ = initial_value;
+  }
+
+  int Get() {
+    return this->counter_;
+  }
+
+  void Set(long new_value) {
+    this->counter_ = new_value;
+  }
+
+  long LongValue() {
+    return (long)this->counter_;
+  }
+
+  int GetAndSet(int new_value) {
+    for ( ; ; ) {
+      int current = this->counter_;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, new_value)) {
+        return current;
+      }
+    }
+  }
+
+  bool CompareAndSet(int expect, int update) {
+    return __sync_bool_compare_and_swap(&this->counter_, expect, update);
+  }
+
+  int GetAndIncrement() {
+    for ( ; ; ) {
+      int current = this->counter_;
+      int next = current + 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return current;
+      }  
+    }
+  }
+
+  int GetAndDecrement() {
+    for ( ; ; ) {
+      int current = this->counter_;
+      int next = current - 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return current;
+      }  
+    }
+  }
+
+  int GetAndAdd(int delta) {
+    for ( ; ; ) {
+      int current = this->counter_;
+      int next = current + delta;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return current;
+      }  
+    }
+  }
+
+  int IncrementAndGet() {
+    for ( ; ; ) {
+      int current = this->counter_;
+      int next = current + 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return next;
+      }
+    }
+  }
+
+  int DecrementAndGet() {
+    for ( ; ; ) {
+      int current = this->counter_;
+      int next = current - 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return next;
+      }
+    }
+  }
+
+  int AddAndGet(int delta) {
+    for ( ; ; ) {
+      int current = this->counter_;
+      int next = current + delta;
+      if (__sync_bool_compare_and_swap (&this->counter_, current, next)) {
+        return next;
+      }
+    }
+  }
+            
+ private:
+  volatile int counter_;
+};
+
+
+class AtomicLong {
+ public:
+  AtomicLong() {
+    this->counter_ = 0;
+  }
+
+  AtomicLong(long initial_value) {
+    this->counter_ = initial_value;
+  }
+
+  long Get() {
+    return this->counter_;
+  }
+
+  void Set(long new_value) {
+    this->counter_ = new_value;
+  }
+
+  long IntValue() {
+    return (int)this->counter_;
+  }
+
+  long GetAndSet(long new_value) {
+    for ( ; ; ) {
+      long current = this->counter_;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, new_value)) {
+        return current;
+      }
+    }
+  }
+
+  bool CompareAndSet(long expect, long update) {
+    return __sync_bool_compare_and_swap(&this->counter_, expect, update);
+  }
+
+  long GetAndIncrement() {
+    for ( ; ; ) {
+      long current = this->counter_;
+      long next = current + 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return current;
+      }
+    }
+  }
+
+  long GetAndDecrement() {
+    for ( ; ; ) {
+      long current = this->counter_;
+      long next = current - 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return current;
+      }
+    }
+  }
+
+  long GetAndAdd(long delta) {
+    for ( ; ; ) {
+      long current = this->counter_;
+      long next = current + delta;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return current;
+      }
+    }
+  }
+
+  long IncrementAndGet() {
+    for ( ; ; ) {
+      long current = this->counter_;
+      long next = current + 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return next;
+      }
+    }
+  }
+
+  long DecrementAndGet() {
+    for ( ; ; ) {
+      long current = this->counter_;
+      long next = current - 1;
+      if (__sync_bool_compare_and_swap(&this->counter_, current, next)) {
+        return next;
+      }
+    }
+  }
+
+  long AddAndGet(long delta) {
+    for ( ; ; ) {
+      long current = this->counter_;
+      long next = current + delta;
+      if (__sync_bool_compare_and_swap (&this->counter_, current, next)) {
+        return next;
+      }
+    }
+  }
+
+ private:
+  volatile long counter_;
+};
+
+
+class AtomicBoolean{
+ public:
+  AtomicBoolean() {
+    this->counter_ = 0;
+  }
+
+  AtomicBoolean(bool initial_value) {
+    this->counter_ = initial_value ? 1 : 0;
+  }
+
+  bool Get() {
+    return this->counter_ != 0;
+  }
+
+  void Set(bool new_value) {
+    this->counter_ = new_value ? 1 : 0;
+  }
+
+  bool GetAndSet(bool new_value) {
+    int u = new_value ? 1 : 0;
+    for (;;) {
+      int e = this->counter_ ? 1 : 0;
+      if (__sync_bool_compare_and_swap(&this->counter_, e, u)) {
+        return e != 0;
+      }
+    }
+  }
+
+  bool CompareAndSet(bool expect, bool update) {
+    int e = expect ? 1 : 0;
+    int u = update ? 1 : 0;
+    return __sync_bool_compare_and_swap(&this->counter_, e, u);
+  }
+
+ private:
+  volatile int counter_;
+};
+
+
+
+
+
+}
+
+
+#endif
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h b/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h
index 1f2e655..5ec1709 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h
@@ -63,6 +63,15 @@ static const int kConfirmWaitPeriodMsMax = 60000;
 // default rebalance wait if shutdown meeting
 static const int kRebWaitPeriodWhenShutdownMs = 10000;
 
+// max int value
+static const int kMaxIntValue = 0x7fffffff;
+// max long value
+static const int kMaxLongValue = 0x7fffffffffffffffL;
+
+// invalid value
+static const int kInvalidValue = -2;
+
+
 }  // namespace config
 
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
index 9326449..de39683 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h
@@ -47,6 +47,7 @@ class Utils {
                    const string& group_name, string& tgt_group_name);
   static bool ValidFilterItem(string& err_info, 
                    const string& src_filteritem, string& tgt_filteritem);
+  static long GetCurrentTimeMillis();
 
 };
  
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
index 84d6721..f7fde92 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
@@ -344,26 +344,27 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
     return false;
   }
   map<string, set<string> > tmp_sub_map;
-  map<string, set<string> >::const_iterator itMap;
-  for (itMap = subscribed_topic_and_filter_map.begin(); itMap != subscribed_topic_and_filter_map.end(); ++itMap) {
+  map<string, set<string> >::const_iterator it_map;
+  for (it_map = subscribed_topic_and_filter_map.begin(); 
+    it_map != subscribed_topic_and_filter_map.end(); ++it_map) {
     int count=0;
     string tmp_filteritem;
     set<string> tgt_filters;
     // check topic_name info
-    is_success = Utils::ValidString(err_info, itMap->first, 
+    is_success = Utils::ValidString(err_info, it_map->first, 
                          false, true, true, config::kTopicNameMaxLength);  
     if (!is_success) {
       stringstream ss;
       ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
-      ss << itMap->first;
+      ss << it_map->first;
       ss << " ";
       ss << err_info;
       err_info = ss.str();
       return false;
     }
-    string topic_name = Utils::Trim(itMap->first);
+    string topic_name = Utils::Trim(it_map->first);
     // check filter info
-    set<string> subscribed_filters = itMap->second;
+    set<string> subscribed_filters = it_map->second;
     for (set<string>::iterator it = subscribed_filters.begin(); it != subscribed_filters.end(); ++it) {
       is_success = Utils::ValidFilterItem(err_info, *it, tmp_filteritem);
       if (!is_success) {
@@ -381,7 +382,7 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
     if (count > config::kFilterItemMaxCount) {
       stringstream ss;
       ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
-      ss << itMap->first;
+      ss << it_map->first;
       ss << "'s filter item over max item count : ";
       ss << config::kFilterItemMaxCount;
       err_info = ss.str();
@@ -419,14 +420,14 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
   // check part_offset_map
   string part_key;
   map<string, long> tmp_parts_map;
-  map<string, long>::const_iterator itPart;
-  for (itPart = part_offset_map.begin(); itPart != part_offset_map.end(); ++itPart) {
+  map<string, long>::const_iterator it_part;
+  for (itPart = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) {
     vector<string> result;
-    Utils::Split(itPart->first, result, delimiter::kDelimiterColon);
+    Utils::Split(it_part->first, result, delimiter::kDelimiterColon);
     if (result.size() != 3) {
       stringstream ss;
       ss << "Illegal parameter: part_offset_map's key ";
-      ss << itPart->first;
+      ss << it_part->first;
       ss << " format error, value must be aaaa:bbbb:cccc !";
       err_info = ss.str();
       return false;
@@ -434,34 +435,34 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu
     if (tmp_sub_map.find(result[1]) != tmp_sub_map.end()) {
       stringstream ss;
       ss << "Illegal parameter: ";
-      ss << itPart->first;
+      ss << it_part->first;
       ss << " subscribed topic ";
       ss << result[1];
       ss << " not included in subscribed_topic_and_filter_map's topic list!";
       err_info = ss.str();
       return false;
     }
-    if (itPart->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
+    if (it_part->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
       stringstream ss;
       ss << "Illegal parameter: key ";
-      ss << itPart->first;
+      ss << it_part->first;
       ss << " include ";
       ss << delimiter::kDelimiterComma;
       ss << " char!";
       err_info = ss.str();
       return false;
     }
-    if (itPart->second < 0) {
+    if (it_part->second < 0) {
       stringstream ss;
       ss << "Illegal parameter: ";
-      ss << itPart->first;
+      ss << it_part->first;
       ss << "'s offset must over or equal zero, value is ";
-      ss << itPart->second;
+      ss << it_part->second;
       err_info = ss.str();
       return false;
     }
     Utils::Join(result, delimiter::kDelimiterColon, part_key);
-    tmp_parts_map[part_key] = itPart->second;
+    tmp_parts_map[part_key] = it_part->second;
   }
   // set verified data
   this->is_bound_consume_ = true;
@@ -551,7 +552,7 @@ string ConsumerConfig::ToString() {
   int i = 0;
   stringstream ss;
   map<string, long>::iterator it;
-  map<string, set<string> >::iterator itMap;
+  map<string, set<string> >::iterator it_map;
 
   // print info
   ss << "ConsumerConfig = {";
@@ -559,8 +560,8 @@ string ConsumerConfig::ToString() {
   ss << ", group_name_='";
   ss << this->group_name_;
   ss << "', sub_topic_and_filter_map_={";
-  for (itMap = this->sub_topic_and_filter_map_.begin(); 
-      itMap != this->sub_topic_and_filter_map_.end(); ++itMap) {
+  for (it_map = this->sub_topic_and_filter_map_.begin(); 
+      it_map != this->sub_topic_and_filter_map_.end(); ++it_map) {
     if (i++ > 0) {
       ss << ",";
     }
@@ -568,8 +569,8 @@ string ConsumerConfig::ToString() {
     ss << itMap->first;
     ss << "'=[";
     int j=0;
-    set<string> topicSet = itMap->second;
-    for (set<string>::iterator it = topicSet.begin(); it != topicSet.end(); ++it) {
+    set<string> topic_set = it_map->second;
+    for (set<string>::iterator it = topic_set.begin(); it != topic_set.end(); ++it) {
       if (j++ > 0) {
         ss << ",";
       }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index f9a1b1e..f8ee3c2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -237,6 +237,12 @@ bool Utils::ValidFilterItem(string& err_info,
 }
 
 
+long Utils::GetCurrentTimeMillis() {
+  struct timeval tv;
+  gettimeofday(&tv,NULL);
+  return tv.tv_sec * 1000 + tv.tv_usec /1000;
+}
+
 
 
 }