You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/06 07:54:27 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-252] Create C/C++ Metadata classes (#187)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 3884712  [TUBEMQ-252] Create C/C++ Metadata classes (#187)
3884712 is described below

commit 3884712d832c098d8e47b71f011736641eb9b1a1
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Jul 6 07:54:20 2020 +0000

    [TUBEMQ-252] Create C/C++ Metadata classes (#187)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq-client-cpp/inc/meta_info.h              |  62 ++++-
 .../tubemq-client-cpp/src/message.cc               |   2 +-
 .../tubemq-client-cpp/src/meta_info.cc             | 309 ++++++++++++++++++++-
 3 files changed, 369 insertions(+), 4 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h
index 55baa4c..6264657 100644
--- a/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h
@@ -20,6 +20,7 @@
 #ifndef _TUBEMQ_CLIENT_META_INFO_H_
 #define _TUBEMQ_CLIENT_META_INFO_H_
 
+#include <list>
 #include <string>
 
 namespace tubemq {
@@ -62,7 +63,7 @@ class Partition {
  public:
   Partition();
   Partition(const string& partition_info);
-  Partition(const NodeInfo& broker_info, const string& partStr);
+  Partition(const NodeInfo& broker_info, const string& part_str);
   Partition(const NodeInfo& broker_info, const string& topic, int partition_id);
   ~Partition();
   Partition& operator=(const Partition& target);
@@ -88,6 +89,65 @@ class Partition {
 };
 
 
+class SubscribeInfo {
+ public:
+  SubscribeInfo(const string& sub_info);
+  SubscribeInfo(const string& consumer_id, const string& group, const Partition& partition);
+  SubscribeInfo& operator=(const SubscribeInfo& target);
+  const string& GetConsumerId() const;
+  const string& GetGroup() const;
+  const Partition& GetPartition() const;
+  const int GgetBrokerId() const;
+  const string& GetBrokerHost() const;
+  const int GetBrokerPort() const;
+  const string& GetTopic() const;
+  const int GetPartitionId() const;
+  const string& ToString() const;
+
+ private:
+  void buildSubInfo();
+
+ private:
+  string    consumer_id_;
+  string    group_;
+  Partition partition_;
+  string    sub_info_;
+};
+
+
+class ConsumerEvent {
+ public:
+  ConsumerEvent();
+  ConsumerEvent(const ConsumerEvent& target);
+  ConsumerEvent(long rebalance_id,int event_type, 
+    const list<SubscribeInfo>& subscribeInfo_lst, int event_status);
+  ConsumerEvent& operator=(const ConsumerEvent& target);
+  const long GetRebalanceId() const;
+  const int  GetEventType() const;
+  const int  GetEventStatus() const;
+  void SetEventType(int event_type);
+  void SetEventStatus(int event_status);
+  const list<SubscribeInfo>& GetSubscribeInfoList() const;
+  string ToString();
+
+ private:
+  long rebalance_id_;
+  int  event_type_;
+  int  event_status_;
+  list<SubscribeInfo> subscribe_list_;
+};
+
+
+class PartitionExt : public Partition {
+  PartitionExt();
+  
+
+};
+
+
+
+
+
 }
 
 #endif
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
index c5162e3..e737a41 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/message.cc
@@ -190,7 +190,7 @@ bool Message::AddProperty(string& err_info, const string& key, const string& val
     err_info = ss.str();
     return false;
   }
-  if(trimed_key == kRsvPropKeyFilterItem 
+  if (trimed_key == kRsvPropKeyFilterItem 
     || trimed_key == kRsvPropKeyMsgTime) {
     stringstream ss;
     ss << "Reserved token '";
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index c744b47..8024a6a 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -18,6 +18,7 @@
  */
 
 #include <sstream>
+#include <vector>
 #include <stdlib.h>
 #include "utils.h"
 #include "meta_info.h"
@@ -34,6 +35,7 @@ NodeInfo::NodeInfo() {
   buildStrInfo();
 }
 
+// node_info = node_id:host:port
 NodeInfo::NodeInfo(bool is_broker, const string& node_info) {
   vector<string> result;
   Utils::Split(node_info, result, delimiter::kDelimiterColon);
@@ -41,7 +43,7 @@ NodeInfo::NodeInfo(bool is_broker, const string& node_info) {
     this->node_id_   = atoi(result[0].c_str());
     this->node_host_ = result[1];
     this->node_port_ = config::kBrokerPortDef;
-    if(result.size() >= 3){
+    if (result.size() >= 3){
       this->node_port_ = atoi(result[2].c_str());
     }
   } else {
@@ -97,7 +99,7 @@ bool NodeInfo::operator== (const NodeInfo& target) {
 }
 
 bool NodeInfo::operator< (const NodeInfo& target) const {
-  return this->node_id_ < target.node_id_;
+  return this->node_info_ < target.node_info_;
 }
   
 const int NodeInfo::GetNodeId() const {
@@ -135,5 +137,308 @@ void NodeInfo::buildStrInfo() {
 }
 
 
+Partition::Partition() {
+  this->topic_ = " ";
+  this->partition_id_ = config::kInvalidValue;
+  buildPartitionKey();
 }
 
+// partition_info = broker_info#topic:partitionId
+Partition::Partition(const string& partition_info) {
+  // initial process
+  this->topic_ = " ";
+  this->partition_id_ = config::kInvalidValue;
+  // parse partition_info string
+  string::size_type pos=0;
+  string seg_key = delimiter::kDelimiterPound;
+  string token_key = delimiter::kDelimiterColon;
+  // parse broker_info
+  pos = partition_info.find(seg_key);
+  if (pos != string::npos){
+    string broker_info = partition_info.substr(0, pos);
+    broker_info = Utils::Trim(broker_info);
+    this->broker_info_ = NodeInfo(true, broker_info);
+    string part_str = partition_info.substr(pos + seg_key.size(), partition_info.size());
+    part_str = Utils::Trim(part_str);
+    pos = part_str.find(token_key);
+    if (pos != string::npos) {
+      string topic_str = part_str.substr(0, pos);
+      string part_id_str = part_str.substr(pos + token_key.size(), part_str.size());
+      topic_str = Utils::Trim(topic_str);
+      part_id_str = Utils::Trim(part_id_str);
+      this->topic_ = topic_str;
+      this->partition_id_ = atoi(part_id_str.c_str());
+    }
+  }
+  buildPartitionKey();
+}
+  
+// part_str = topic:partition_id
+Partition::Partition(const NodeInfo& broker_info, const string& part_str) {
+  vector<string> result;
+  this->topic_ = " ";
+  this->partition_id_ = config::kInvalidValue;
+  this->broker_info_ = broker_info;
+  Utils::Split(part_str, result, delimiter::kDelimiterColon);
+  if (result.size() >= 2) {
+    this->topic_ = result[0];
+    this->partition_id_ = atoi(result[1].c_str());
+  }
+  buildPartitionKey();
+}
+
+Partition::Partition(const NodeInfo& broker_info, const string& topic, int partition_id) {
+  this->topic_ = topic;
+  this->partition_id_ = partition_id;
+  this->broker_info_ = broker_info;
+  buildPartitionKey();
+}
+
+Partition::~Partition() {
+  //
+}
+
+Partition& Partition::operator=(const Partition& target) {
+  if (this != &target) {
+    this->topic_ = target.topic_;
+    this->partition_id_ = target.partition_id_;
+    this->broker_info_ = target.broker_info_;
+    this->partition_key_ = target.partition_key_;
+    this->partition_info_ = target.partition_info_;
+  }
+  return *this;
+}
+
+bool Partition::operator== (const Partition& target) {
+  if (this == &target) {
+    return true;
+  }
+  if (this->partition_info_ == target.partition_info_) {
+    return true;
+  }
+  return false;
+
+}
+
+const int Partition::GetBrokerId() const {
+  return this->broker_info_.GetNodeId();
+}
+
+const string& Partition::GetBrokerHost() const {
+  return this->broker_info_.GetHost();
+}
+
+const int Partition::GetBrokerPort() const {
+  return this->broker_info_.GetPort();
+}
+
+const string& Partition::GetPartitionKey() const {
+  return this->partition_key_;
+}
+
+const string& Partition::GetTopic() const {
+  return this->topic_;
+}
+
+const NodeInfo& Partition::GetBrokerInfo() const {
+  return this->broker_info_;
+}
+
+const int Partition::GetPartitionId() const {
+  return this->partition_id_;
+}
+
+const string& Partition::ToString() const {
+  return this->partition_info_;
+}
+
+void Partition::buildPartitionKey() {
+  stringstream ss1;
+  ss1 << this->broker_info_.GetNodeId();
+  ss1 << delimiter::kDelimiterColon;
+  ss1 << this->topic_;
+  ss1 << delimiter::kDelimiterColon;
+  ss1 << this->partition_id_;
+  this->partition_key_ = ss1.str();
+
+  stringstream ss2;
+  ss2 << this->broker_info_.GetNodeInfo();
+  ss2 << delimiter::kDelimiterPound;
+  ss2 << this->topic_;
+  ss2 << delimiter::kDelimiterColon;
+  ss2 << this->partition_id_;
+  this->partition_info_ = ss2.str();
+}
+
+
+// sub_info = consumerId@group#broker_info#topic:partitionId
+SubscribeInfo::SubscribeInfo(const string& sub_info) {
+  string::size_type pos=0;
+  string seg_key = delimiter::kDelimiterPound;
+  string at_key = delimiter::kDelimiterAt;
+  this->consumer_id_ = " ";
+  this->group_ = " ";
+  // parse sub_info
+  pos=sub_info.find(seg_key);
+  if (pos != string::npos) {
+    string consumer_info = sub_info.substr(0, pos);
+    consumer_info = Utils::Trim(consumer_info);
+    string partition_info = sub_info.substr(pos + seg_key.size(), sub_info.size());
+    partition_info = Utils::Trim(partition_info);
+    this->partition_ = Partition(partition_info);
+    pos = consumer_info.find(at_key);
+    this->consumer_id_ = consumer_info.substr(0, pos);
+    this->consumer_id_ = Utils::Trim(this->consumer_id_);
+    this->group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size());
+    this->group_ = Utils::Trim(this->group_);
+  }
+  buildSubInfo();
+}
+
+SubscribeInfo::SubscribeInfo(const string& consumer_id, 
+                 const string& group, const Partition& partition) {
+  this->consumer_id_ = consumer_id;
+  this->group_       = group;
+  this->partition_   = partition;
+  buildSubInfo();
+}
+
+
+SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
+  if (this != &target) {
+    this->consumer_id_ = target.consumer_id_;
+    this->group_       = target.group_;
+    this->partition_   = target.partition_;
+  }
+  return *this;
+}
+
+const string& SubscribeInfo::GetConsumerId() const {
+  return this->consumer_id_;
+}
+
+const string& SubscribeInfo::GetGroup() const {
+  return this->group_;
+}
+
+const Partition& SubscribeInfo::GetPartition() const {
+  return this->partition_;
+}
+
+const int SubscribeInfo::GgetBrokerId() const {
+  return this->partition_.GetBrokerId();
+}
+
+const string& SubscribeInfo::GetBrokerHost() const {
+  return this->partition_.GetBrokerHost();
+}
+
+const int SubscribeInfo::GetBrokerPort() const {
+  return this->partition_.GetBrokerPort();
+}
+
+const string& SubscribeInfo::GetTopic() const {
+  return this->partition_.GetTopic();
+}
+
+const int SubscribeInfo::GetPartitionId() const {
+  return this->partition_.GetPartitionId();
+}
+
+const string& SubscribeInfo::ToString() const {
+  return this->sub_info_;
+}
+
+void SubscribeInfo::buildSubInfo() {
+  stringstream ss;
+  ss << this->consumer_id_;
+  ss << delimiter::kDelimiterAt;
+  ss << this->group_;
+  ss << delimiter::kDelimiterPound;
+  ss << this->partition_.ToString();
+  this->sub_info_ = ss.str();
+}
+
+
+ConsumerEvent::ConsumerEvent() {
+  this->rebalance_id_ = config::kInvalidValue;
+  this->event_type_   = config::kInvalidValue;
+  this->event_status_ = config::kInvalidValue;
+}
+
+ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) {
+  this->rebalance_id_ = target.rebalance_id_;
+  this->event_type_   = target.event_type_;
+  this->event_status_ = target.event_status_;
+  this->subscribe_list_ = target.subscribe_list_;
+}
+
+ConsumerEvent::ConsumerEvent(long rebalance_id,int event_type, 
+    const list<SubscribeInfo>& subscribeInfo_lst, int event_status) {
+  list<SubscribeInfo>::const_iterator it;
+  this->rebalance_id_ = rebalance_id;
+  this->event_type_   = event_type;
+  this->event_status_ = event_status;
+  for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) {
+    this->subscribe_list_.push_back(*it);
+  }
+}
+
+ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) {
+  if(this != &target){
+    this->rebalance_id_ = target.rebalance_id_;
+    this->event_type_ = target.event_type_;
+    this->event_status_ = target.event_status_;
+    this->subscribe_list_ = target.subscribe_list_;
+  }
+  return *this;
+}
+
+const long ConsumerEvent::GetRebalanceId() const {
+  return this->rebalance_id_;
+}
+
+const int ConsumerEvent::GetEventType() const {
+  return this->event_type_;
+}
+
+const int ConsumerEvent::GetEventStatus() const {
+  return this->event_status_;
+}
+
+void ConsumerEvent::SetEventType(int event_type) {
+  this->event_type_ = event_type;
+}
+
+void ConsumerEvent::SetEventStatus(int event_status) {
+  this->event_status_ = event_status;
+}
+
+const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const {
+  return this->subscribe_list_;
+}
+
+string ConsumerEvent::ToString() {
+  int count = 0;
+  stringstream ss;
+  list<SubscribeInfo>::const_iterator it;
+  ss << "ConsumerEvent [rebalanceId=";
+  ss << this->rebalance_id_;
+  ss << ", type=";
+  ss << this->event_type_;
+  ss << ", status=";
+  ss << this->event_status_;
+  ss << ", subscribeInfoList=[";
+  for (it = this->subscribe_list_.begin(); 
+          it != this->subscribe_list_.end(); ++it) {
+    if(count++ > 0) {
+      ss << ",";
+    }
+    ss << it->ToString();
+  }
+  ss << "]]";
+  return ss.str();   
+}
+
+};
+