You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/15 02:01:26 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-280]Create C/C++ subscribe info class (#206)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new df42152  [TUBEMQ-280]Create C/C++ subscribe info class (#206)
df42152 is described below

commit df42152eb7dc11974b6ced7729f4b9afb2fec67a
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Jul 15 02:01:15 2020 +0000

    [TUBEMQ-280]Create C/C++ subscribe info class (#206)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq-client-cpp/include/tubemq/atomic_def.h  |   6 +-
 .../include/tubemq/client_subinfo.h                |  80 +++++++++++++
 .../include/tubemq/executor_pool.h                 |   2 +
 .../tubemq-client-cpp/include/tubemq/meta_info.h   |   6 +-
 .../tubemq-client-cpp/src/client_subinfo.cc        | 130 +++++++++++++++++++++
 .../tubemq-client-cpp/src/rmt_data_cache.cc        |   7 +-
 6 files changed, 222 insertions(+), 9 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
index 30830d9..efd5332 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
@@ -29,7 +29,7 @@ class AtomicInteger {
  public:
   AtomicInteger() { this->counter_ = 0; }
 
-  AtomicInteger(int32_t initial_value) { this->counter_ = initial_value; }
+  explicit AtomicInteger(int32_t initial_value) { this->counter_ = initial_value; }
 
   int32_t Get() const { return this->counter_; }
 
@@ -118,7 +118,7 @@ class AtomicLong {
  public:
   AtomicLong() { this->counter_ = 0; }
 
-  AtomicLong(int64_t initial_value) { this->counter_ = initial_value; }
+  explicit AtomicLong(int64_t initial_value) { this->counter_ = initial_value; }
 
   int64_t Get() const { return this->counter_; }
 
@@ -207,7 +207,7 @@ class AtomicBoolean {
  public:
   AtomicBoolean() { this->counter_ = 0; }
 
-  AtomicBoolean(bool initial_value) { this->counter_ = initial_value ? 1 : 0; }
+  explicit AtomicBoolean(bool initial_value) { this->counter_ = initial_value ? 1 : 0; }
 
   bool Get() const { return this->counter_ != 0; }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
new file mode 100644
index 0000000..c609e71
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_SUBINFO_H_
+#define TUBEMQ_CLIENT_SUBINFO_H_
+
+#include <stdint.h>
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include "tubemq/atomic_def.h"
+
+namespace tubemq {
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+
+
+class ClientSubInfo {
+ public:
+  ClientSubInfo();
+  void SetConsumeTarget(bool bound_consume,
+                               const map<string, set<string> >& topic_and_filter_map,
+                               const string& session_key, uint32_t source_count,
+                               bool select_big, const map<string, int64_t>& part_offset_map);
+  bool CompAndSetNotAllocated(bool expect, bool update);
+  void BookFstRegistered() { first_registered_.Set(true); }
+  bool IsBoundConsume() { return bound_consume_; }
+  bool IsNotAllocated() { return not_allocated_.Get(); }
+  const int64_t GetSubscribedTime() const { return subscribed_time_; }
+  const string& GetSessionKey() const { return session_key_; }
+  const uint32_t GetSourceCnt() const { return source_count_; }
+  bool SelectBig() { return select_big_; }
+  bool IsFilterConsume(const string& topic);
+  void GetAssignedPartOffset(const string& partition_key, int64_t& offset);
+  const string& GetBoundPartInfo() const { return bound_partions_; }
+  const list<string>& GetSubTopics() const { return topics_; }
+  const list<string>& GetTopicConds() const { return topic_conds_; }
+  const map<string, set<string> >& GetTopicFilterMap() const;
+
+ private:
+  bool bound_consume_;
+  AtomicBoolean first_registered_;
+  AtomicBoolean not_allocated_;
+  int64_t  subscribed_time_;
+  map<string, set<string> > topic_and_filter_map_;
+  list<string> topics_;
+  list<string> topic_conds_;
+  map<string, bool> topic_filter_map_;
+  // bound info
+  string session_key_;
+  uint32_t source_count_;
+  bool select_big_;
+  map<string, int64_t> assigned_part_map_;
+  string bound_partions_;
+};
+
+}  // namespace tubemq
+
+
+#endif  // TUBEMQ_CLIENT_SUBINFO_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
index 0dd1f66..a5208a4 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
@@ -28,6 +28,8 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <vector>
+
 
 #include "tubemq/noncopyable.h"
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index ccd62e9..3091347 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -65,7 +65,7 @@ class NodeInfo {
 class Partition {
  public:
   Partition();
-  Partition(const string& partition_info);
+  explicit Partition(const string& partition_info);
   Partition(const NodeInfo& broker_info, const string& part_str);
   Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id);
   ~Partition();
@@ -94,7 +94,7 @@ class Partition {
 class PartitionExt : public Partition {
  public:
   PartitionExt();
-  PartitionExt(const string& partition_info);
+  explicit PartitionExt(const string& partition_info);
   PartitionExt(const NodeInfo& broker_info, const string& part_str);
   ~PartitionExt();
   PartitionExt& operator=(const PartitionExt& target);
@@ -136,7 +136,7 @@ class PartitionExt : public Partition {
 class SubscribeInfo {
  public:
   SubscribeInfo();
-  SubscribeInfo(const string& sub_info);
+  explicit SubscribeInfo(const string& sub_info);
   SubscribeInfo(const string& consumer_id,
         const string& group_name, const PartitionExt& partition_ext);
   SubscribeInfo& operator=(const SubscribeInfo& target);
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
new file mode 100644
index 0000000..0c8064c
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/client_subinfo.h"
+#include "tubemq/const_config.h"
+#include "tubemq/utils.h"
+
+
+
+namespace tubemq {
+
+
+ClientSubInfo::ClientSubInfo() {
+  bound_consume_ = false;
+  select_big_ = false;
+  source_count_ = 0;
+  session_key_ = "";
+  not_allocated_.Set(true);
+  first_registered_.Set(false);
+  subscribed_time_ = tb_config::kInvalidValue;
+  bound_partions_ = "";
+}
+
+void ClientSubInfo::SetConsumeTarget(bool bound_consume,
+         const map<string, set<string> >& topic_and_filter_map,
+         const string& session_key, uint32_t source_count,
+         bool select_big, const map<string, int64_t>& part_offset_map) {
+  int32_t count = 0;
+  string tmpstr = "";
+  // book register time
+  subscribed_time_ = Utils::GetCurrentTimeMillis();
+  //
+  first_registered_.Set(false);
+  bound_consume_ = bound_consume;
+  topic_and_filter_map_ = topic_and_filter_map;
+  // build topic filter info
+  topics_.clear();
+  topic_conds_.clear();
+  set<string>::iterator it_set;
+  map<string, set<string> >::const_iterator it_topic;
+  for (it_topic = topic_and_filter_map.begin();
+      it_topic != topic_and_filter_map.end(); it_topic++) {
+    topics_.push_back(it_topic->first);
+    if (it_topic->second.empty()) {
+      topic_filter_map_[it_topic->first] = false;
+    } else {
+      topic_filter_map_[it_topic->first] = true;
+
+      //build topic conditions
+      count = 0;
+      tmpstr = it_topic->first;
+      tmpstr += delimiter::kDelimiterPound;
+      for (it_set = it_topic->second.begin();
+          it_set != it_topic->second.end(); it_set++) {
+        if (count++ > 0) {
+          tmpstr += delimiter::kDelimiterComma;
+        }
+        tmpstr += *it_set;
+      }
+      topic_conds_.push_back(tmpstr);
+    }
+  }
+
+  //build bound_partition info
+  if (bound_consume) {
+    session_key_ = session_key;
+    source_count_ = source_count;
+    select_big_ = select_big;
+    assigned_part_map_ = part_offset_map;
+    count = 0;
+    bound_partions_ = "";
+    map<string, int64_t>::const_iterator it;
+    for (it = part_offset_map.begin(); it != part_offset_map.end(); it++) {
+      if (count++ > 0) {
+        bound_partions_ += delimiter::kDelimiterComma;
+      }
+      bound_partions_ += it->first;
+      bound_partions_ += delimiter::kDelimiterEqual;
+      bound_partions_ += Utils::Long2str(it->second);
+    }
+  }
+}
+
+bool ClientSubInfo::CompAndSetNotAllocated(bool expect, bool update) {
+  return not_allocated_.CompareAndSet(expect, update);
+}
+
+bool ClientSubInfo::IsFilterConsume(const string& topic) {
+  map<string, bool>::iterator it;
+  it = topic_filter_map_.find(topic);
+  if (it == topic_filter_map_.end()) {
+    return false;
+  }
+  return it->second;
+}
+
+void ClientSubInfo::GetAssignedPartOffset(const string& partition_key, int64_t& offset) {
+  map<string, int64_t>::iterator it;
+  if (first_registered_.Get() && bound_consume_ && not_allocated_.Get()) {
+    it = assigned_part_map_.find(partition_key);
+    if (it != assigned_part_map_.end()) {
+      offset = it->second;
+    }
+  }
+  offset = tb_config::kInvalidValue;
+}
+
+const map<string, set<string> >& ClientSubInfo::GetTopicFilterMap() const {
+  return topic_and_filter_map_;
+}
+
+
+}  // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index caa4236..2164fa9 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -176,9 +176,10 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
   return result;
 }
 
-void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset,
-                                             int32_t err_code, bool esc_limit, int32_t msg_size,
-                                             int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow) {
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, 
+                     int64_t curr_offset, int32_t err_code, bool esc_limit,
+                     int32_t msg_size,int64_t limit_dlt, int64_t cur_data_dlt,
+                     bool require_slow) {
   map<string, PartitionExt>::iterator it_part;
   // book partition offset info
   if (curr_offset >= 0) {