You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:46:53 UTC

[incubator-tubemq] 37/50: [TUBEMQ-286]Create C/C++ SDK's manager class (#212)

This is an automated email from the ASF dual-hosted git repository.

gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 1fb720f1726023ccdcdbc1e365343682a64a991d
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Jul 20 14:19:37 2020 +0000

    [TUBEMQ-286]Create C/C++ SDK's manager class (#212)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../include/tubemq/client_service.h                | 20 ++++----
 .../tubemq-client-cpp/src/client_service.cc        | 60 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 12 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
index d78a690..5e0b113 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
@@ -48,6 +48,7 @@ class BaseClient {
  public:
   BaseClient(bool is_producer);
   virtual ~BaseClient();
+  virtual void ShutDown();
   void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
   bool IsProducer() { return is_producer_; }
   const int32_t GetClientIndex() { return client_index_; }
@@ -58,26 +59,25 @@ class BaseClient {
 };
 
 
-enum ServiceStatus {
-  kServiceReady = 0,
-  kServiceRunning = 1,
-  kServiceStop = 2,
-};  // enum ServiceStatus
-
-
 class TubeMQService : public Singleton<TubeMQService> {
  public:
-  TubeMQService();
+  // TubeMQService();
+  // ~TubeMQService();
   bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
   bool Stop(string& err_info);
   bool IsRunning();
-  const int32_t  getServiceStatus() const { return service_status_.Get(); }
+  const int32_t  GetServiceStatus() const { return service_status_.Get(); }
+  int32_t GetClientObjCnt();
   bool AddClientObj(string& err_info,
          BaseClient* client_obj, int32_t& client_index);
   BaseClient* GetClientObj(int32_t client_index) const;
+  BaseClient* RmvClientObj(int32_t client_index);
+  const ExecutorPoolPtr& GetTimerExecutor() const { return timer_executor_; }
+  const ExecutorPoolPtr& GetNetWorkExecutor() const { return network_executor_; }
 
  private:
-  bool iniLogger(const Fileini& fileini, const string& sector);
+  void iniLogger(const Fileini& fileini, const string& sector);
+  void shutDownClinets() const;
 
  private:
   AtomicInteger service_status_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
index bd1da8a..1357123 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
@@ -41,6 +41,18 @@ BaseClient::~BaseClient() {
   // no code
 }
 
+/*
+TubeMQService::TubeMQService() {
+  service_status_.Set(0);
+  client_index_base_.Set(0);
+}
+
+TubeMQService::~TubeMQService() {
+  string err_info;
+  Stop(err_info);
+}
+*/
+
 bool TubeMQService::Start(string& err_info, string conf_file) {
   // check configure file
   bool result = false;
@@ -55,11 +67,29 @@ bool TubeMQService::Start(string& err_info, string conf_file) {
   if (!result) {
     return result;
   }
+  if (!service_status_.CompareAndSet(0,1)) {
+    err_info = "TubeMQ Service has startted or Stopped!";
+    return false;
+  }
   iniLogger(fileini, sector);
+  service_status_.set(2);
+}
+
+bool TubeMQService::Stop(string& err_info) {
+  if (service_status_.CompareAndSet(2, -1)) {
+    shutDownClinets();
+    timer_executor_->Close();
+    network_executor_->Close();
+  }
+  err_info = "OK!";
+  return true;
 }
 
+bool TubeMQService::IsRunning() {
+  return (service_status_.Get() == 2);
+}
 
-bool TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
+void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
   string err_info;
   int32_t log_num = 10;
   int32_t log_size = 10;
@@ -71,9 +101,15 @@ bool TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
   fileini.GetValue(err_info, sector, "log_level", log_level, 4);
   log_level = TUBEMQ_MID(log_level, 0, 4);
   GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num);
-  return true;
 }
 
+
+int32_t TubeMQService::GetClientObjCnt() {
+  lock_guard<mutex> lck(mutex_);
+  return clients_map_.size();
+}
+
+
 bool TubeMQService::AddClientObj(string& err_info,
            BaseClient* client_obj, int32_t& client_index) {
   if (service_status_.Get() != 0) {
@@ -99,6 +135,26 @@ BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
   return client_obj;
 }
 
+BaseClient* TubeMQService::RmvClientObj(int32_t client_index) {
+  BaseClient* client_obj = NULL;
+  map<int32_t, BaseClient*>::iterator it;
+  
+  lock_guard<mutex> lck(mutex_);
+  it = clients_map_.find(client_index);
+  if (it != clients_map_.end()) {
+    client_obj = it->second;
+    clients_map_.erase(client_index);
+  }
+  return client_obj;
+}
+
+void TubeMQService::shutDownClinets() const {
+  map<int32_t, BaseClient*>::const_iterator it;
+  lock_guard<mutex> lck(mutex_);
+  for (it = clients_map_.begin(); it != clients_map_.end(); it++) {
+    it->second->ShutDown();
+  }
+}
 
 
 }  // namespace tubemq