You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:46:53 UTC
[incubator-tubemq] 37/50: [TUBEMQ-286]Create C/C++ SDK's manager
class (#212)
This is an automated email from the ASF dual-hosted git repository.
gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 1fb720f1726023ccdcdbc1e365343682a64a991d
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Jul 20 14:19:37 2020 +0000
[TUBEMQ-286]Create C/C++ SDK's manager class (#212)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../include/tubemq/client_service.h | 20 ++++----
.../tubemq-client-cpp/src/client_service.cc | 60 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 12 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
index d78a690..5e0b113 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/client_service.h
@@ -48,6 +48,7 @@ class BaseClient {
public:
BaseClient(bool is_producer);
virtual ~BaseClient();
+ virtual void ShutDown();
void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
bool IsProducer() { return is_producer_; }
const int32_t GetClientIndex() { return client_index_; }
@@ -58,26 +59,25 @@ class BaseClient {
};
-enum ServiceStatus {
- kServiceReady = 0,
- kServiceRunning = 1,
- kServiceStop = 2,
-}; // enum ServiceStatus
-
-
class TubeMQService : public Singleton<TubeMQService> {
public:
- TubeMQService();
+ // TubeMQService();
+ // ~TubeMQService();
bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
bool Stop(string& err_info);
bool IsRunning();
- const int32_t getServiceStatus() const { return service_status_.Get(); }
+ const int32_t GetServiceStatus() const { return service_status_.Get(); }
+ int32_t GetClientObjCnt();
bool AddClientObj(string& err_info,
BaseClient* client_obj, int32_t& client_index);
BaseClient* GetClientObj(int32_t client_index) const;
+ BaseClient* RmvClientObj(int32_t client_index);
+ const ExecutorPoolPtr& GetTimerExecutor() const { return timer_executor_; }
+ const ExecutorPoolPtr& GetNetWorkExecutor() const { return network_executor_; }
private:
- bool iniLogger(const Fileini& fileini, const string& sector);
+ void iniLogger(const Fileini& fileini, const string& sector);
+ void shutDownClinets() const;
private:
AtomicInteger service_status_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
index bd1da8a..1357123 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
@@ -41,6 +41,18 @@ BaseClient::~BaseClient() {
// no code
}
+/*
+TubeMQService::TubeMQService() {
+ service_status_.Set(0);
+ client_index_base_.Set(0);
+}
+
+TubeMQService::~TubeMQService() {
+ string err_info;
+ Stop(err_info);
+}
+*/
+
bool TubeMQService::Start(string& err_info, string conf_file) {
// check configure file
bool result = false;
@@ -55,11 +67,29 @@ bool TubeMQService::Start(string& err_info, string conf_file) {
if (!result) {
return result;
}
+ if (!service_status_.CompareAndSet(0,1)) {
+ err_info = "TubeMQ Service has startted or Stopped!";
+ return false;
+ }
iniLogger(fileini, sector);
+ service_status_.set(2);
+}
+
+bool TubeMQService::Stop(string& err_info) {
+ if (service_status_.CompareAndSet(2, -1)) {
+ shutDownClinets();
+ timer_executor_->Close();
+ network_executor_->Close();
+ }
+ err_info = "OK!";
+ return true;
}
+bool TubeMQService::IsRunning() {
+ return (service_status_.Get() == 2);
+}
-bool TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
+void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
string err_info;
int32_t log_num = 10;
int32_t log_size = 10;
@@ -71,9 +101,15 @@ bool TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
fileini.GetValue(err_info, sector, "log_level", log_level, 4);
log_level = TUBEMQ_MID(log_level, 0, 4);
GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num);
- return true;
}
+
+int32_t TubeMQService::GetClientObjCnt() {
+ lock_guard<mutex> lck(mutex_);
+ return clients_map_.size();
+}
+
+
bool TubeMQService::AddClientObj(string& err_info,
BaseClient* client_obj, int32_t& client_index) {
if (service_status_.Get() != 0) {
@@ -99,6 +135,26 @@ BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
return client_obj;
}
+BaseClient* TubeMQService::RmvClientObj(int32_t client_index) {
+ BaseClient* client_obj = NULL;
+ map<int32_t, BaseClient*>::iterator it;
+
+ lock_guard<mutex> lck(mutex_);
+ it = clients_map_.find(client_index);
+ if (it != clients_map_.end()) {
+ client_obj = it->second;
+ clients_map_.erase(client_index);
+ }
+ return client_obj;
+}
+
+void TubeMQService::shutDownClinets() const {
+ map<int32_t, BaseClient*>::const_iterator it;
+ lock_guard<mutex> lck(mutex_);
+ for (it = clients_map_.begin(); it != clients_map_.end(); it++) {
+ it->second->ShutDown();
+ }
+}
} // namespace tubemq