You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/15 02:01:26 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-280]Create C/C++ subscribe info class (#206)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new df42152 [TUBEMQ-280]Create C/C++ subscribe info class (#206)
df42152 is described below
commit df42152eb7dc11974b6ced7729f4b9afb2fec67a
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Jul 15 02:01:15 2020 +0000
[TUBEMQ-280]Create C/C++ subscribe info class (#206)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq-client-cpp/include/tubemq/atomic_def.h | 6 +-
.../include/tubemq/client_subinfo.h | 80 +++++++++++++
.../include/tubemq/executor_pool.h | 2 +
.../tubemq-client-cpp/include/tubemq/meta_info.h | 6 +-
.../tubemq-client-cpp/src/client_subinfo.cc | 130 +++++++++++++++++++++
.../tubemq-client-cpp/src/rmt_data_cache.cc | 7 +-
6 files changed, 222 insertions(+), 9 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
index 30830d9..efd5332 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
@@ -29,7 +29,7 @@ class AtomicInteger {
public:
AtomicInteger() { this->counter_ = 0; }
- AtomicInteger(int32_t initial_value) { this->counter_ = initial_value; }
+ explicit AtomicInteger(int32_t initial_value) { this->counter_ = initial_value; }
int32_t Get() const { return this->counter_; }
@@ -118,7 +118,7 @@ class AtomicLong {
public:
AtomicLong() { this->counter_ = 0; }
- AtomicLong(int64_t initial_value) { this->counter_ = initial_value; }
+ explicit AtomicLong(int64_t initial_value) { this->counter_ = initial_value; }
int64_t Get() const { return this->counter_; }
@@ -207,7 +207,7 @@ class AtomicBoolean {
public:
AtomicBoolean() { this->counter_ = 0; }
- AtomicBoolean(bool initial_value) { this->counter_ = initial_value ? 1 : 0; }
+ explicit AtomicBoolean(bool initial_value) { this->counter_ = initial_value ? 1 : 0; }
bool Get() const { return this->counter_ != 0; }
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
new file mode 100644
index 0000000..c609e71
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_subinfo.h
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_SUBINFO_H_
+#define TUBEMQ_CLIENT_SUBINFO_H_
+
+#include <stdint.h>
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include "tubemq/atomic_def.h"
+
+namespace tubemq {
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+
+
+class ClientSubInfo {
+ public:
+ ClientSubInfo();
+ void SetConsumeTarget(bool bound_consume,
+ const map<string, set<string> >& topic_and_filter_map,
+ const string& session_key, uint32_t source_count,
+ bool select_big, const map<string, int64_t>& part_offset_map);
+ bool CompAndSetNotAllocated(bool expect, bool update);
+ void BookFstRegistered() { first_registered_.Set(true); }
+ bool IsBoundConsume() { return bound_consume_; }
+ bool IsNotAllocated() { return not_allocated_.Get(); }
+ const int64_t GetSubscribedTime() const { return subscribed_time_; }
+ const string& GetSessionKey() const { return session_key_; }
+ const uint32_t GetSourceCnt() const { return source_count_; }
+ bool SelectBig() { return select_big_; }
+ bool IsFilterConsume(const string& topic);
+ void GetAssignedPartOffset(const string& partition_key, int64_t& offset);
+ const string& GetBoundPartInfo() const { return bound_partions_; }
+ const list<string>& GetSubTopics() const { return topics_; }
+ const list<string>& GetTopicConds() const { return topic_conds_; }
+ const map<string, set<string> >& GetTopicFilterMap() const;
+
+ private:
+ bool bound_consume_;
+ AtomicBoolean first_registered_;
+ AtomicBoolean not_allocated_;
+ int64_t subscribed_time_;
+ map<string, set<string> > topic_and_filter_map_;
+ list<string> topics_;
+ list<string> topic_conds_;
+ map<string, bool> topic_filter_map_;
+ // bound info
+ string session_key_;
+ uint32_t source_count_;
+ bool select_big_;
+ map<string, int64_t> assigned_part_map_;
+ string bound_partions_;
+};
+
+} // namespace tubemq
+
+
+#endif // TUBEMQ_CLIENT_SUBINFO_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
index 0dd1f66..a5208a4 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
@@ -28,6 +28,8 @@
#include <memory>
#include <mutex>
#include <thread>
+#include <vector>
+
#include "tubemq/noncopyable.h"
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index ccd62e9..3091347 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -65,7 +65,7 @@ class NodeInfo {
class Partition {
public:
Partition();
- Partition(const string& partition_info);
+ explicit Partition(const string& partition_info);
Partition(const NodeInfo& broker_info, const string& part_str);
Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id);
~Partition();
@@ -94,7 +94,7 @@ class Partition {
class PartitionExt : public Partition {
public:
PartitionExt();
- PartitionExt(const string& partition_info);
+ explicit PartitionExt(const string& partition_info);
PartitionExt(const NodeInfo& broker_info, const string& part_str);
~PartitionExt();
PartitionExt& operator=(const PartitionExt& target);
@@ -136,7 +136,7 @@ class PartitionExt : public Partition {
class SubscribeInfo {
public:
SubscribeInfo();
- SubscribeInfo(const string& sub_info);
+ explicit SubscribeInfo(const string& sub_info);
SubscribeInfo(const string& consumer_id,
const string& group_name, const PartitionExt& partition_ext);
SubscribeInfo& operator=(const SubscribeInfo& target);
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
new file mode 100644
index 0000000..0c8064c
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/client_subinfo.h"
+#include "tubemq/const_config.h"
+#include "tubemq/utils.h"
+
+
+
+namespace tubemq {
+
+
+ClientSubInfo::ClientSubInfo() {
+ bound_consume_ = false;
+ select_big_ = false;
+ source_count_ = 0;
+ session_key_ = "";
+ not_allocated_.Set(true);
+ first_registered_.Set(false);
+ subscribed_time_ = tb_config::kInvalidValue;
+ bound_partions_ = "";
+}
+
+void ClientSubInfo::SetConsumeTarget(bool bound_consume,
+ const map<string, set<string> >& topic_and_filter_map,
+ const string& session_key, uint32_t source_count,
+ bool select_big, const map<string, int64_t>& part_offset_map) {
+ int32_t count = 0;
+ string tmpstr = "";
+ // book register time
+ subscribed_time_ = Utils::GetCurrentTimeMillis();
+ //
+ first_registered_.Set(false);
+ bound_consume_ = bound_consume;
+ topic_and_filter_map_ = topic_and_filter_map;
+ // build topic filter info
+ topics_.clear();
+ topic_conds_.clear();
+ set<string>::iterator it_set;
+ map<string, set<string> >::const_iterator it_topic;
+ for (it_topic = topic_and_filter_map.begin();
+ it_topic != topic_and_filter_map.end(); it_topic++) {
+ topics_.push_back(it_topic->first);
+ if (it_topic->second.empty()) {
+ topic_filter_map_[it_topic->first] = false;
+ } else {
+ topic_filter_map_[it_topic->first] = true;
+
+ //build topic conditions
+ count = 0;
+ tmpstr = it_topic->first;
+ tmpstr += delimiter::kDelimiterPound;
+ for (it_set = it_topic->second.begin();
+ it_set != it_topic->second.end(); it_set++) {
+ if (count++ > 0) {
+ tmpstr += delimiter::kDelimiterComma;
+ }
+ tmpstr += *it_set;
+ }
+ topic_conds_.push_back(tmpstr);
+ }
+ }
+
+ //build bound_partition info
+ if (bound_consume) {
+ session_key_ = session_key;
+ source_count_ = source_count;
+ select_big_ = select_big;
+ assigned_part_map_ = part_offset_map;
+ count = 0;
+ bound_partions_ = "";
+ map<string, int64_t>::const_iterator it;
+ for (it = part_offset_map.begin(); it != part_offset_map.end(); it++) {
+ if (count++ > 0) {
+ bound_partions_ += delimiter::kDelimiterComma;
+ }
+ bound_partions_ += it->first;
+ bound_partions_ += delimiter::kDelimiterEqual;
+ bound_partions_ += Utils::Long2str(it->second);
+ }
+ }
+}
+
+bool ClientSubInfo::CompAndSetNotAllocated(bool expect, bool update) {
+ return not_allocated_.CompareAndSet(expect, update);
+}
+
+bool ClientSubInfo::IsFilterConsume(const string& topic) {
+ map<string, bool>::iterator it;
+ it = topic_filter_map_.find(topic);
+ if (it == topic_filter_map_.end()) {
+ return false;
+ }
+ return it->second;
+}
+
+void ClientSubInfo::GetAssignedPartOffset(const string& partition_key, int64_t& offset) {
+ map<string, int64_t>::iterator it;
+ if (first_registered_.Get() && bound_consume_ && not_allocated_.Get()) {
+ it = assigned_part_map_.find(partition_key);
+ if (it != assigned_part_map_.end()) {
+ offset = it->second;
+ }
+ }
+ offset = tb_config::kInvalidValue;
+}
+
+const map<string, set<string> >& ClientSubInfo::GetTopicFilterMap() const {
+ return topic_and_filter_map_;
+}
+
+
+} // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index caa4236..2164fa9 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -176,9 +176,10 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
return result;
}
-void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset,
- int32_t err_code, bool esc_limit, int32_t msg_size,
- int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow) {
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
+ int64_t curr_offset, int32_t err_code, bool esc_limit,
+ int32_t msg_size,int64_t limit_dlt, int64_t cur_data_dlt,
+ bool require_slow) {
map<string, PartitionExt>::iterator it_part;
// book partition offset info
if (curr_offset >= 0) {