You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/08/18 16:13:19 UTC
[2/4] incubator-singa git commit: SINGA-21 Code review 3
SINGA-21 Code review 3
review cluster.h, cluster.cc
-- change the first Get() of cluster to Setup()
-- change shared pointers to raw pointers
-- let cluster be a singleton
-- remove unused endpoints_ field
-- format the code
In cluster_rt.h/cc
-- remove CluterRuntime interface
-- rename ZKClusterRT to ClusterRuntime
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/b24f0a32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/b24f0a32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/b24f0a32
Branch: refs/heads/master
Commit: b24f0a32dd55c43b274e021ab4a53011943f83c7
Parents: e28b039
Author: wang sheng <wa...@gmail.com>
Authored: Tue Aug 18 15:39:02 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Tue Aug 18 16:01:08 2015 +0800
----------------------------------------------------------------------
include/utils/cluster.h | 158 +++++++++++++++++-----------------------
include/utils/cluster_rt.h | 95 +++++++++++-------------
src/proto/common.proto | 2 +-
src/trainer/trainer.cc | 2 +-
src/trainer/worker.cc | 1 -
src/utils/cluster.cc | 125 +++++++++++++++----------------
src/utils/cluster_rt.cc | 22 +++---
7 files changed, 179 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index e2c979d..be0e0de 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -1,19 +1,16 @@
-#ifndef INCLUDE_UTILS_CLUSTER_H_
-#define INCLUDE_UTILS_CLUSTER_H_
+#ifndef SINGA_UTILS_CLUSTER_H_
+#define SINGA_UTILS_CLUSTER_H_
+
#include <glog/logging.h>
#include <string>
-#include <utility>
+#include <unordered_map>
#include <memory>
#include <vector>
-#include <unordered_map>
-#include "utils/common.h"
#include "proto/job.pb.h"
#include "proto/singa.pb.h"
#include "utils/cluster_rt.h"
-
-using std::shared_ptr;
-using std::string;
-using std::vector;
+#include "utils/common.h"
+#include "utils/singleton.h"
namespace singa {
@@ -24,113 +21,91 @@ namespace singa {
*/
class Cluster {
public:
- static shared_ptr<Cluster> Get();
- static shared_ptr<Cluster> Get(int job_id,
- const SingaProto& singaConf, const ClusterProto& clusterConf);
-
- const int nserver_groups()const{ return cluster_.nserver_groups(); }
- const int nworker_groups()const { return cluster_.nworker_groups(); }
- int nworkers_per_group()const {return cluster_.nworkers_per_group();}
- int nservers_per_group()const {return cluster_.nservers_per_group();}
- int nworkers_per_procs()const{return cluster_.nworkers_per_procs();}
- int nservers_per_procs()const{return cluster_.nservers_per_procs();}
- int nworker_groups_per_server_group() const {
- if(nserver_groups()==0||nservers_per_group()==0)
+ // Cluster is a global singleton in a process
+ static Cluster* Setup(int job_id, const SingaProto& singaConf,
+ const ClusterProto& clusterConf);
+ static Cluster* Get();
+
+ inline int nserver_groups() const { return cluster_.nserver_groups(); }
+ inline int nworker_groups() const { return cluster_.nworker_groups(); }
+ inline int nworkers_per_group() const { return cluster_.nworkers_per_group();}
+ inline int nservers_per_group() const { return cluster_.nservers_per_group();}
+ inline int nworkers_per_procs() const { return cluster_.nworkers_per_procs();}
+ inline int nservers_per_procs() const { return cluster_.nservers_per_procs();}
+ inline int nworker_groups_per_server_group() const {
+ if (nserver_groups() == 0 || nservers_per_group() == 0)
return 1;
else
- return cluster_.nworker_groups()/cluster_.nserver_groups();
+ return cluster_.nworker_groups() / cluster_.nserver_groups();
}
-
/**
* @return true if the calling procs has server threads, otherwise false
*/
- bool has_server()const {
- if(server_worker_separate()){
+ inline bool has_server() const {
+ if (server_worker_separate()) {
CHECK_LT(procs_id_, nprocs_);
- return procs_id_>=nworker_procs();
- }else
- return procs_id_<nserver_procs();
+ return procs_id_ >= nworker_procs();
+ } else {
+ return procs_id_ < nserver_procs();
+ }
}
/**
* @return true if the calling procs has worker threads.
*/
- bool has_worker()const {
- return procs_id_<nworker_procs();
+ inline bool has_worker() const {
+ return procs_id_ < nworker_procs();
}
/**
* @return global procs id, which starts from 0.
*/
- int procs_id() const {return procs_id_;}
- void set_procs_id(int procs_id) {procs_id_ = procs_id;}
- bool server_worker_separate() const {
+ inline int procs_id() const { return procs_id_; }
+ inline void set_procs_id(int procs_id) { procs_id_ = procs_id; }
+ inline bool server_worker_separate() const {
return cluster_.server_worker_separate();
}
- int nworker_procs() const {
- return nworker_groups()*nworkers_per_group()/nworkers_per_procs();
- }
- int nserver_procs() const {
- return nserver_groups()*nservers_per_group()/nservers_per_procs();
+ inline int nworker_procs() const {
+ return nworker_groups() * nworkers_per_group() / nworkers_per_procs();
}
- int nprocs() const {
- return nprocs_;
+ inline int nserver_procs() const {
+ return nserver_groups() * nservers_per_group() / nservers_per_procs();
}
-
-
+ inline int nprocs() const { return nprocs_; }
/**
* @return endpoint of the router of a procs with the specified id
*/
- const string endpoint(const int procs_id) const;
-
- const string workspace() {return cluster_.workspace();}
-
- const string vis_folder() const {
- return cluster_.workspace()+"/visualization";
- }
- const string checkpoint_folder() const {
- return cluster_.workspace()+"/checkpoint";
+ inline std::string endpoint(int procs_id) const {
+ CHECK_LT(procs_id, nprocs());
+ CHECK_GE(procs_id, 0);
+ return cluster_rt_->GetProcHost(procs_id);
}
- /*
- const int stub_timeout() const {
- return cluster_.stub_timeout();
- }
- const int worker_timeout() const {
- return cluster_.worker_timeout();
+ inline std::string workspace() const { return cluster_.workspace(); }
+ inline std::string vis_folder() const {
+ return cluster_.workspace() + "/visualization";
}
- const int server_timeout() const {
- return cluster_.server_timeout();
+ inline std::string checkpoint_folder() const {
+ return cluster_.workspace() + "/checkpoint";
}
+ /*
+ const int stub_timeout() const { return cluster_.stub_timeout(); }
+ const int worker_timeout() const { return cluster_.worker_timeout(); }
+ const int server_timeout() const { return cluster_.server_timeout(); }
*/
-
- const bool server_update() const {
- return cluster_.server_update();
- }
-
- const bool share_memory() const {
- return cluster_.share_memory();
- }
-
+ inline bool server_update() const { return cluster_.server_update(); }
+ inline bool share_memory() const { return cluster_.share_memory(); }
/**
* bandwidth Bytes/s
*/
- const int bandwidth() const {
- return cluster_.bandwidth();
- }
-
- const int poll_time() const {
- return cluster_.poll_time();
- }
-
- shared_ptr<ClusterRuntime> runtime() const {
- return cluster_rt_;
- }
+ inline int bandwidth() const { return cluster_.bandwidth(); }
+ inline int poll_time() const { return cluster_.poll_time(); }
+ ClusterRuntime* runtime() const { return cluster_rt_; }
/**
* @return logical procs ID
*/
- int ProcsIDOf(int group_id, int id, int flag);
- const string hostip() const {
- return hostip_;
+ inline int ProcsIDOf(int group_id, int id, int flag) {
+ return procs_ids_.at(Hash(group_id, id, flag));
}
+ inline std::string hostip() const { return hostip_; }
/**
* Register this process.
*
@@ -138,27 +113,24 @@ class Cluster {
* logical process ID.
* @param endpoint unique string for other procs to connect
*/
- void Register(int pid, const string& endpoint);
+ void Register(int pid, const std::string& endpoint);
private:
- Cluster(int job, const SingaProto& singaConf, const ClusterProto& clusterConf);
+ void Init(int job, const SingaProto& singaConf,
+ const ClusterProto& clusterConf);
void SetupFolders(const ClusterProto &cluster);
int Hash(int gid, int id, int flag);
- private:
- int procs_id_;
- int nprocs_;
- string hostip_;
- std::vector<std::string> endpoints_;
+ int procs_id_ = -1;
+ int nprocs_ = 0;
+ std::string hostip_ = "";
// cluster config proto
ClusterProto cluster_;
SingaProto singa_;
- shared_ptr<ClusterRuntime> cluster_rt_;
- // make this class a singlton
- static shared_ptr<Cluster> instance_;
+ ClusterRuntime* cluster_rt_ = nullptr;
std::unordered_map<int, int> procs_ids_;
};
} // namespace singa
-#endif // INCLUDE_UTILS_CLUSTER_H_
+#endif // SINGA_UTILS_CLUSTER_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 4c93c25..49f6e4d 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -9,46 +9,6 @@ namespace singa {
typedef void (*rt_callback)(void *contest);
-/**
- * ClusterRuntime is a runtime service that manages dynamic configuration
- * and status of the whole cluster. It mainly provides following services:
- * 1) Provide running status of each server/worker
- * 2) Translate process id to (hostname:port)
- */
-class ClusterRuntime {
- public:
- virtual ~ClusterRuntime() {}
- /**
- * Initialize the runtime instance
- */
- virtual bool Init() = 0;
- /**
- * register the process, and get a unique process id
- *
- * \return the process id, -1 if failed
- */
- virtual int RegistProc(const std::string& host_addr, int pid) = 0;
- /**
- * translate the process id to host address
- *
- * \return the host and port, "" if no such proc id
- */
- virtual std::string GetProcHost(int proc_id) = 0;
- /**
- * Server: watch all workers in a server group,
- * will be notified when all workers have left
- */
- virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0;
- /**
- * Worker: join a server group (i.e. start to read/update these servers)
- */
- virtual bool JoinSGroup(int gid, int wid, int s_group) = 0;
- /**
- * Worker: leave a server group (i.e. finish its all work)
- */
- virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
-};
-
const int kZKBufSize = 100;
// following paths are global
const std::string kZKPathSinga = "/singa";
@@ -78,10 +38,14 @@ struct JobInfo {
std::string name;
};
+/*
+ * A wrapper for zookeeper service which handles error code and reconnections
+ */
class ZKService {
public:
static void ChildChanges(zhandle_t* zh, int type, int state,
const char *path, void* watcherCtx);
+
~ZKService();
bool Init(const std::string& host, int timeout);
bool CreateNode(const char* path, const char* val, int flag, char* output);
@@ -102,18 +66,47 @@ class ZKService {
zhandle_t* zkhandle_ = nullptr;
};
-class ZKClusterRT : public ClusterRuntime {
+/**
+ * ClusterRuntime is a runtime service that manages dynamic configuration
+ * and status of the whole cluster. It mainly provides following services:
+ * 1) Provide running status of each server/worker
+ * 2) Translate process id to (hostname:port)
+ */
+class ClusterRuntime {
public:
- ZKClusterRT(const std::string& host, int job_id);
- ZKClusterRT(const std::string& host, int job_id, int timeout);
- ~ZKClusterRT() override;
-
- bool Init() override;
- int RegistProc(const std::string& host_addr, int pid) override;
- std::string GetProcHost(int proc_id) override;
- bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
- bool JoinSGroup(int gid, int wid, int s_group) override;
- bool LeaveSGroup(int gid, int wid, int s_group) override;
+ ClusterRuntime(const std::string& host, int job_id);
+ ClusterRuntime(const std::string& host, int job_id, int timeout);
+ ~ClusterRuntime();
+
+ /**
+ * Initialize the runtime instance
+ */
+ bool Init();
+ /**
+ * register the process, and get a unique process id
+ *
+ * \return the process id, -1 if failed
+ */
+ int RegistProc(const std::string& host_addr, int pid);
+ /**
+ * translate the process id to host address
+ *
+ * \return the host and port, "" if no such proc id
+ */
+ std::string GetProcHost(int proc_id);
+ /**
+ * Server: watch all workers in a server group,
+ * will be notified when all workers have left
+ */
+ bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx);
+ /**
+ * Worker: join a server group (i.e. start to read/update these servers)
+ */
+ bool JoinSGroup(int gid, int wid, int s_group);
+ /**
+ * Worker: leave a server group (i.e. finish its all work)
+ */
+ bool LeaveSGroup(int gid, int wid, int s_group);
private:
inline std::string groupPath(int gid) {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/proto/common.proto
----------------------------------------------------------------------
diff --git a/src/proto/common.proto b/src/proto/common.proto
index 3b6efb3..8927f2b 100644
--- a/src/proto/common.proto
+++ b/src/proto/common.proto
@@ -84,7 +84,7 @@ message SingleLabelImageRecord {
}
message MetricProto {
- repeated string name =1;
+ repeated string name = 1;
repeated int32 count = 2;
repeated float val = 3;
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index ea060ec..fe90bc4 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -226,7 +226,7 @@ void Trainer::Resume(JobProto* jobConf) {
void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
// register job to zookeeper at the beginning
- auto cluster = Cluster::Get(job->id(), singaConf, job->cluster());
+ auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
if (resume)
Resume(job);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 6052f3a..3b1617d 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -25,7 +25,6 @@ void Worker::Setup(
train_net_ = train_net;
validation_net_ = valid_net;
test_net_ = test_net;
- auto cluster = Cluster::Get();
}
Worker::~Worker() {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 6dad2a8..9664064 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -1,97 +1,86 @@
-#include <glog/logging.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <fstream>
#include "utils/cluster.h"
-#include "proto/common.pb.h"
+
#include <sys/stat.h>
#include <sys/types.h>
+#include <unistd.h>
+#include <fstream>
+
namespace singa {
-std::shared_ptr<Cluster> Cluster::instance_;
-Cluster::Cluster(
- int job, const SingaProto& singaConf, const ClusterProto& clusterConf) {
+Cluster* Cluster::Setup(int job, const SingaProto& singaConf,
+ const ClusterProto& clusterConf) {
+ Singleton<Cluster>::Instance()->Init(job, singaConf, clusterConf);
+ return Singleton<Cluster>::Instance();
+}
+
+Cluster* Cluster::Get() {
+ if (!Singleton<Cluster>::Instance()->nprocs_) {
+ LOG(ERROR) << "The first call to Get should "
+ << "provide the sys/model conf path";
+ }
+ return Singleton<Cluster>::Instance();
+}
+
+void Cluster::Register(int pid, const std::string& endpoint) {
+ procs_id_ = cluster_rt_->RegistProc(endpoint, pid);
+ CHECK_GE(procs_id_, 0);
+ CHECK_LT(procs_id_, nprocs());
+ LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
+ << " (pid = " << pid << ")";
+}
+
+void Cluster::Init(int job, const SingaProto& singaConf,
+ const ClusterProto& clusterConf) {
cluster_ = clusterConf;
singa_ = singaConf;
SetupFolders(clusterConf);
- if(server_worker_separate())
- nprocs_=nworker_procs()+nserver_procs();
+ if (server_worker_separate())
+ nprocs_ = nworker_procs() + nserver_procs();
else
- nprocs_=std::max(nworker_procs(), nserver_procs());
+ nprocs_ = std::max(nworker_procs(), nserver_procs());
// locate the process id of every worker/server
- int ngrps=cluster_.nworker_groups(), grp_size=cluster_.nworkers_per_group();
- int procs=0;
- for(int i=0;i<ngrps;i++){
- for(int j=0;j<grp_size;j++){
- procs=(i*grp_size+j) / cluster_.nworkers_per_procs();
- procs_ids_[Hash(i,j,kWorkerLayer)]=procs;
- procs_ids_[Hash(i,j,kWorkerParam)]=procs;
+ int ngrps = cluster_.nworker_groups();
+ int grp_size = cluster_.nworkers_per_group();
+ int procs = 0;
+ for (int i = 0; i < ngrps; ++i) {
+ for (int j = 0; j < grp_size; ++j) {
+ procs = (i * grp_size + j) / cluster_.nworkers_per_procs();
+ procs_ids_[Hash(i, j, kWorkerLayer)] = procs;
+ procs_ids_[Hash(i, j, kWorkerParam)] = procs;
}
}
- int offset=cluster_.server_worker_separate()? procs+1:0;
- ngrps=cluster_.nserver_groups(), grp_size=cluster_.nservers_per_group();
- for(int i=0;i<ngrps;i++){
- for(int j=0;j<grp_size;j++){
- procs_ids_[Hash(i,j,kServer)]=(i*grp_size+j) / cluster_.nservers_per_procs()+offset;
+ int offset = cluster_.server_worker_separate() ? procs + 1 : 0;
+ ngrps = cluster_.nserver_groups();
+ grp_size = cluster_.nservers_per_group();
+ for (int i = 0; i < ngrps; ++i) {
+ for (int j = 0; j < grp_size; ++j) {
+ procs_ids_[Hash(i, j, kServer)] =
+ (i * grp_size + j) / cluster_.nservers_per_procs() + offset;
}
}
-
- auto rt = new ZKClusterRT(singa_.zookeeper_host(), job);
- rt->Init();
- cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
-
- hostip_=GetHostIP();
-}
-
-void Cluster::Register(int pid, const string& endpoint) {
- procs_id_=cluster_rt_->RegistProc(endpoint, pid);
- CHECK_GE(procs_id_,0);
- CHECK_LT(procs_id_,nprocs());
- LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
- << " (pid = " << pid << ")";
-}
-
-const string Cluster::endpoint(int procsid) const {
- CHECK_LT(procsid, nprocs());
- CHECK_GE(procsid, 0);
- if(endpoints_.size())
- return endpoints_.at(procsid);
- else
- return cluster_rt_->GetProcHost(procsid);
+ cluster_rt_ = new ClusterRuntime(singa_.zookeeper_host(), job);
+ cluster_rt_->Init();
+ hostip_ = GetHostIP();
}
-void Cluster::SetupFolders(const ClusterProto &cluster){
+void Cluster::SetupFolders(const ClusterProto &cluster) {
// create visulization folder
mkdir(vis_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
// create checkpoint folder
mkdir(checkpoint_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
-shared_ptr<Cluster> Cluster::Get(
- int job, const SingaProto& singaConf, const ClusterProto& clusterConf) {
- instance_.reset(new Cluster(job, singaConf, clusterConf));
- return instance_;
-}
-
-shared_ptr<Cluster> Cluster::Get() {
- if(!instance_) {
- LOG(ERROR)<<"The first call to Get should "
- <<"provide the sys/model conf path";
- }
- return instance_;
-}
int Cluster::Hash(int gid, int id, int flag) {
- int ret=-1;
- if(flag==kServer){
- ret=(flag*cluster_.nserver_groups()+gid)*cluster_.nservers_per_group() + id;
- }else{
- ret=(flag*cluster_.nworker_groups()+gid)*cluster_.nworkers_per_group() + id;
+ int ret = -1;
+ if (flag == kServer) {
+ ret = (flag * cluster_.nserver_groups() + gid)
+ * cluster_.nservers_per_group() + id;
+ } else {
+ ret = (flag * cluster_.nworker_groups() + gid)
+ * cluster_.nworkers_per_group() + id;
}
return ret;
}
-int Cluster::ProcsIDOf(int group_id, int id, int flag) {
- return procs_ids_.at(Hash(group_id, id, flag));
-}
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 454ecfc..5cb670e 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -39,7 +39,7 @@ ZKService::~ZKService() {
zookeeper_close(zkhandle_);
}
-char zk_cxt[] = "ZKClusterRT";
+char zk_cxt[] = "ClusterRuntime";
bool ZKService::Init(const string& host, int timeout) {
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
@@ -174,10 +174,10 @@ void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
}
}
-ZKClusterRT::ZKClusterRT(const string& host, int job_id)
- : ZKClusterRT(host, job_id, 30000) {}
+ClusterRuntime::ClusterRuntime(const string& host, int job_id)
+ : ClusterRuntime(host, job_id, 30000) {}
-ZKClusterRT::ZKClusterRT(const string& host, int job_id, int timeout) {
+ClusterRuntime::ClusterRuntime(const string& host, int job_id, int timeout) {
host_ = host;
timeout_ = timeout;
workspace_ = GetZKJobWorkspace(job_id);
@@ -186,14 +186,14 @@ ZKClusterRT::ZKClusterRT(const string& host, int job_id, int timeout) {
proc_lock_path_ = workspace_ + kZKPathJobPLock;
}
-ZKClusterRT::~ZKClusterRT() {
+ClusterRuntime::~ClusterRuntime() {
// release callback vector
for (RTCallback* p : cb_vec_) {
delete p;
}
}
-bool ZKClusterRT::Init() {
+bool ClusterRuntime::Init() {
if (!zk_.Init(host_, timeout_)) return false;
if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
return false;
@@ -210,7 +210,7 @@ bool ZKClusterRT::Init() {
return true;
}
-int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
+int ClusterRuntime::RegistProc(const string& host_addr, int pid) {
char buf[kZKBufSize];
string lock = proc_lock_path_ + "/lock-";
if (!zk_.CreateNode(lock.c_str(), nullptr,
@@ -245,7 +245,7 @@ int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
return id;
}
-bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
+bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
CHECK_NOTNULL(fn);
string path = groupPath(gid);
// create zk node
@@ -260,7 +260,7 @@ bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
return zk_.WGetChild(path.c_str(), &child, cb);
}
-std::string ZKClusterRT::GetProcHost(int proc_id) {
+std::string ClusterRuntime::GetProcHost(int proc_id) {
// char buf[kZKBufSize];
char val[kZKBufSize];
// construct file name
@@ -273,13 +273,13 @@ std::string ZKClusterRT::GetProcHost(int proc_id) {
return string(val);
}
-bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
+bool ClusterRuntime::JoinSGroup(int gid, int wid, int s_group) {
string path = groupPath(s_group) + workerPath(gid, wid);
// try to create an ephemeral node under server group path
return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
}
-bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
+bool ClusterRuntime::LeaveSGroup(int gid, int wid, int s_group) {
string path = groupPath(s_group) + workerPath(gid, wid);
return zk_.DeleteNode(path.c_str());
}