You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/09 03:29:30 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-269] Create C/C++ RmtDataCache class (#195)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 5343eb4  [TUBEMQ-269] Create C/C++ RmtDataCache class (#195)
5343eb4 is described below

commit 5343eb44d5f09da30a30ae6baa0dc6a2be05a9d1
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jul 9 03:29:21 2020 +0000

    [TUBEMQ-269] Create C/C++ RmtDataCache class (#195)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../include/tubemq/client_config.h                 |   3 +-
 .../include/tubemq/const_errcode.h                 |  49 +++++++
 .../include/tubemq/flowctrl_def.h                  |   2 +-
 .../tubemq-client-cpp/include/tubemq/message.h     |   2 +-
 .../tubemq-client-cpp/include/tubemq/meta_info.h   |  33 +++++
 .../tubemq-client-cpp/src/client_config.cc         |  10 +-
 .../tubemq-client-cpp/src/file_ini.cc              |   4 +
 .../tubemq-client-cpp/src/flowctrl_def.cc          |   2 +
 .../tubemq-client-cpp/src/message.cc               |   5 +-
 .../tubemq-client-cpp/src/meta_info.cc             | 148 ++++++++++++++++++++-
 tubemq-client-twins/tubemq-client-cpp/src/utils.cc |   3 +
 11 files changed, 250 insertions(+), 11 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
index 95bef67..fecbeb2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_config.h
@@ -32,7 +32,6 @@ namespace tubemq {
 using std::map;
 using std::set;
 using std::string;
-using std::vector;
 
 class BaseConfig {
  public:
@@ -122,7 +121,7 @@ class ConsumerConfig : public BaseConfig {
  private:
   bool setGroupConsumeTarget(string& err_info, bool is_bound_consume, const string& group_name,
                              const map<string, set<string> >& subscribed_topic_and_filter_map,
-                             const string& session_key, int32_t source_count, bool is_select_big,
+                             const string& session_key, uint32_t source_count, bool is_select_big,
                              const map<string, int64_t>& part_offset_map);
 
  private:
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h
new file mode 100644
index 0000000..c4e6370
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/const_errcode.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_CONST_ERR_CODE_H_
+#define TUBEMQ_CLIENT_CONST_ERR_CODE_H_
+
+namespace tubemq {
+
+namespace err_code {
+  static const int32_t kErrSuccess = 200;
+  static const int32_t kErrNotReady = 201;
+  static const int32_t kErrMoved = 301;
+
+  static const int32_t kErrBadRequest = 400;
+  static const int32_t kErrUnAuthorized = 401;
+  static const int32_t kErrForbidden = 403;
+  static const int32_t kErrNotFound = 404;
+  static const int32_t kErrPartitionOccupied = 410;
+  static const int32_t kErrHbNoNode = 411;
+  static const int32_t kErrDuplicatePartition = 412;
+  static const int32_t kErrCertificateFailure = 415;
+  static const int32_t kErrServerOverflow = 419;
+  static const int32_t kErrConsumeGroupForbidden = 450;
+  static const int32_t kErrConsumeSpeedLimit = 452;
+  static const int32_t kErrConsumeContentForbidden = 455;
+  
+  static const int32_t kErrServerError = 500;
+  static const int32_t kErrServiceUnavilable = 503;
+  static const int32_t kErrServerMsgsetNullError = 510;
+  static const int32_t kErrWaitServerRspTimeout = 550;
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_CONST_ERR_CODE_H_
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
index 98d8be4..0f05bb3 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
@@ -91,7 +91,7 @@ class FlowCtrlRuleHandler {
   ~FlowCtrlRuleHandler();
   void UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id, int64_t flowctrl_id,
                              const string& flowctrl_info);
-  bool GetCurDataLimit(int32_t last_datadlt, FlowCtrlResult& flowctrl_result);
+  bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result);
   int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit);
   int32_t GetMinZeroCnt() { return this->min_zero_cnt_.Get(); }
   int32_t GetQryPriorityId() { return this->qrypriority_id_.Get(); }
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
index 945b59e..8d68201 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/message.h
@@ -45,7 +45,7 @@ class Message {
   void SetTopic(const string& topic);
   const char* GetData() const;
   uint32_t GetDataLength() const;
-  void setData(const char* data, int datalen);
+  void setData(const char* data, uint32_t datalen);
   const int32_t GetFlag() const;
   void SetFlag(int32_t flag);
   const map<string, string>& GetProperties() const;
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index e99939d..6dcd324 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -141,6 +141,39 @@ class ConsumerEvent {
 class PartitionExt : public Partition {
  public:
   PartitionExt();
+  PartitionExt(const string& partition_info);
+  PartitionExt(const NodeInfo& broker_info, const string& part_str);
+  ~PartitionExt();
+  void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
+    int64_t rsp_dlt_limit, long last_datadlt, bool require_slow);
+  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed);
+  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed,
+    int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit,
+    int64_t last_datadlt, bool require_slow);
+  void SetLastConsumed(bool last_consumed);
+  bool IsLastConsumed();
+ private:
+  void resetParameters();
+
+ private:
+  bool is_last_consumed_;
+  FlowCtrlResult cur_flowctrl_;
+  FlowCtrlItem cur_freqctrl_;
+  int64_t next_stage_updtime_;
+  int64_t next_slice_updtime_;
+  int64_t limit_slice_msgsize_;
+  int64_t cur_stage_msgsize_;
+  int64_t cur_slice_msgsize_;
+  int32_t total_zero_cnt_;
+  int64_t booked_time_;
+  int32_t booked_errcode_;
+  bool    booked_esc_limit_;
+  int32_t booked_msgsize_;
+  int64_t booked_dlt_limit_;
+  int64_t booked_curdata_dlt_;
+  bool    booked_require_slow_;
 };
 
 }  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
index 6c1f757..c6da13d 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc
@@ -28,6 +28,11 @@
 
 namespace tubemq {
 
+using std::stringstream;
+using std::set;
+using std::vector;
+
+
 BaseConfig::BaseConfig() {
   this->master_addrinfo_ = "";
   this->auth_enable_ = false;
@@ -391,11 +396,6 @@ bool ConsumerConfig::setGroupConsumeTarget(
     }
     return false;
   }
-  // check source_count
-  if (source_count <= 0) {
-    err_info = "Illegal parameter: source_count must over zero!";
-    return false;
-  }
   // check part_offset_map
   string part_key;
   map<string, int64_t> tmp_parts_map;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
index fe8edcd..97cf806 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
@@ -29,6 +29,10 @@
 
 namespace tubemq {
 
+using std::ifstream;
+
+
+
 Fileini::Fileini() {
   this->init_flag_ = false;
   this->ini_map_.clear();
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
index 1975306..27caba8 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
@@ -31,6 +31,8 @@
 
 namespace tubemq {
 
+using std::stringstream;
+
 FlowCtrlResult::FlowCtrlResult() {
   this->datasize_limit_ = config::kMaxIntValue;
   this->freqms_limit_ = 0;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
index 7fb83f8..d0afacf 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
@@ -28,6 +28,9 @@
 
 namespace tubemq {
 
+using std::stringstream;
+
+
 // message flag's properties settings
 static const int32_t kMsgFlagIncProperties = 0x01;
 // reserved property key Filter Item
@@ -73,7 +76,7 @@ Message& Message::operator=(const Message& target) {
   return *this;
 }
 
-const uint64_t Message::GetMessageId() const { return this->message_id_; }
+const int64_t Message::GetMessageId() const { return this->message_id_; }
 
 void Message::SetMessageId(int64_t message_id) { this->message_id_ = message_id; }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index c1f428c..81ecf79 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -25,11 +25,12 @@
 #include <vector>
 
 #include "const_config.h"
+#include "const_errcode.h"
 #include "utils.h"
 
 namespace tubemq {
 
-using std::sstream;
+using std::stringstream;
 using std::vector;
 
 NodeInfo::NodeInfo() {
@@ -381,4 +382,149 @@ string ConsumerEvent::ToString() {
   return ss.str();
 }
 
+
+PartitionExt::PartitionExt() : Partition() {
+  resetParameters();
+}
+
+PartitionExt::PartitionExt(const string& partition_info) : Partition(partition_info) {
+  resetParameters();
+}
+
+PartitionExt::PartitionExt(const NodeInfo& broker_info, const string& part_str)
+  : Partition(broker_info, part_str) {
+  resetParameters();
+}
+
+PartitionExt::~PartitionExt() {
+  //
+}
+
+void PartitionExt::BookConsumeData(int32_t errcode, int32_t msg_size,
+  bool req_esc_limit, int64_t rsp_dlt_limit, long last_datadlt, bool require_slow) {
+  this->booked_time_ =Utils::GetCurrentTimeMillis();
+  this->booked_errcode_ = errcode;
+  this->booked_esc_limit_= req_esc_limit;
+  this->booked_msgsize_ = msg_size;
+  this->booked_dlt_limit_ = rsp_dlt_limit;
+  this->booked_curdata_dlt_ = last_datadlt;
+  this->booked_require_slow_ = require_slow;
+}
+
+int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+  const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed) {
+  int64_t dlt_time = Utils::GetCurrentTimeMillis() - this->booked_time_;
+  return ProcConsumeResult(def_flowctrl_handler, group_flowctrl_handler, filter_consume,
+    last_consumed, this->booked_errcode_, this->booked_msgsize_, this->booked_esc_limit_,
+    this->booked_dlt_limit_, this->booked_curdata_dlt_, this->booked_require_slow_) - dlt_time;
+}
+
+int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+  const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed,
+  int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit,
+  int64_t last_datadlt, bool require_slow) {
+  bool result = false;
+  // Accumulated data received
+  this->_isLastConsumed = last_consumed;
+  this->cur_stage_msgsize_ += msg_size;
+  this->cur_slice_msgsize_ += msg_size;
+  // Update strategy data values
+  int64_t curr_time = Utils::GetCurrentTimeMillis();
+  if (curr_time - this->next_stage_updtime_) {
+    this->cur_stage_msgsize_ = 0;
+    this->cur_slice_msgsize_ = 0;
+    if (last_datadlt >= 0) {
+      result = group_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+      if (!result) {
+        result = def_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+        if (!result) {
+          this->cur_flowctrl_.SetDataDltAndFreqLimit(config::kMaxLongValue, 0);
+        }
+      }
+      this->cur_freqctrl_ = group_flowctrl_handler.GetFilterCtrlItem();
+      if (this->cur_freqctrl_.getFreqLtInMs() < 0) {
+        this->cur_freqctrl_ = def_flowctrl_handler.GetFilterCtrlItem();
+      }
+      curr_time = Utils::GetCurrentTimeMillis();
+    }
+    this->limit_slice_msgsize_ = this->cur_flowctrl_.GetDataSizeLimit() / 12;
+    this->next_stage_updtime_ = curr_time + 60000;
+    this->next_slice_updtime_ = curr_time + 5000;
+  } else if(curr_time > this->next_slice_updtime_) {
+    this->cur_slice_msgsize_ = 0;
+    this->next_slice_updtime_ = curr_time + 5000;
+  }
+  // Perform different strategies based on error codes
+  switch (errcode) {
+    case err_code::kErrNotFound:
+    case err_code::kErrSuccess:
+      if (msg_size == 0 && errcode != err_code::kErrSuccess) {
+        this->total_zero_cnt_ += 1;
+      } else {
+        this->total_zero_cnt_ = 0;
+      }
+      if (this->total_zero_cnt_ > 0) {
+        if (group_flowctrl_handler.GetMinZeroCnt() != config::kMaxIntValue) {
+          return (int64_t)group_flowctrl_handler.GetCurFreqLimitTime(
+            this->total_zero_cnt_, (int32_t)rsp_dlt_limit);
+        } else {
+          return (int64_t)def_flowctrl_handler.GetCurFreqLimitTime(
+            this->_totalRcvZeroCount, (int32_t)rsp_dlt_limit);
+        }
+      }
+      if (req_esc_limit) {
+        return 0;
+      } else {
+        if (this->cur_stage_msgsize_ >= this->cur_flowctrl_.GetDataSizeLimit()
+          || this->cur_slice_msgsize_ >= this->limit_slice_msgsize_) {
+          return this->cur_flowctrl_.GetFreqMsLimit() > rsp_dlt_limit
+            ? this->cur_flowctrl_.GetFreqMsLimit() : rsp_dlt_limit;
+        }
+        if (errcode == err_code::kErrSuccess) {
+          if (filter_consume && this->cur_freqctrl_.GetFreqMsLimit() >= 0) {
+            if (require_slow) {
+              return this->cur_freqctrl_.GetZeroCnt();
+            } else {
+              return this->cur_freqctrl_.GetFreqMsLimit();
+            }
+          } else if (!filter_consume && this->cur_freqctrl_.GetDataSizeLimit() >=0) {
+            return this->cur_freqctrl_.GetDataSizeLimit();
+          }
+        }
+        return rsp_dlt_limit;
+      }
+      break;
+
+    default:
+      return rsp_dlt_limit;
+  }
+}
+
+void PartitionExt::SetLastConsumed(bool last_consumed) {
+  this->is_last_consumed_ = last_consumed;
+}
+
+bool PartitionExt::IsLastConsumed() {
+  return this->is_last_consumed_;
+}
+
+void PartitionExt::resetParameters() {
+  this->is_last_consumed_ = false;
+  this->cur_flowctrl_.SetDataDltAndFreqLimit(config::kMaxLongValue, 20);
+  this->next_stage_updtime_ = 0;
+  this->next_slice_updtime_ = 0;
+  this->limit_slice_msgsize_ = 0;
+  this->cur_stage_msgsize_ = 0;
+  this->cur_slice_msgsize_ = 0;
+  this->total_zero_cnt_ = 0;
+  this->booked_time_ = 0;
+  this->booked_errcode_ = 0;
+  this->booked_esc_limit_= false;
+  this->booked_msgsize_ = 0;
+  this->booked_dlt_limit_ = 0;
+  this->booked_curdata_dlt_ = 0;
+  this->booked_require_slow_ = false;
+}
+
+
 };  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index 2fba193..bbfba96 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -31,6 +31,9 @@
 
 namespace tubemq {
 
+using std::stringstream;
+
+
 static const string kWhitespaceCharSet = " \n\r\t\f\v";
 
 string Utils::Trim(const string& source) {