You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/08/18 16:13:19 UTC

[2/4] incubator-singa git commit: SINGA-21 Code review 3

SINGA-21 Code review 3

review cluster.h, cluster.cc
  -- change the first Get() of cluster to Setup()
  -- change shared pointers to raw pointers
  -- let cluster be a singleton
  -- remove unused endpoints_ field
  -- format the code

In cluster_rt.h/cc
  -- remove CluterRuntime interface
  -- rename ZKClusterRT to ClusterRuntime


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/b24f0a32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/b24f0a32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/b24f0a32

Branch: refs/heads/master
Commit: b24f0a32dd55c43b274e021ab4a53011943f83c7
Parents: e28b039
Author: wang sheng <wa...@gmail.com>
Authored: Tue Aug 18 15:39:02 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Tue Aug 18 16:01:08 2015 +0800

----------------------------------------------------------------------
 include/utils/cluster.h    | 158 +++++++++++++++++-----------------------
 include/utils/cluster_rt.h |  95 +++++++++++-------------
 src/proto/common.proto     |   2 +-
 src/trainer/trainer.cc     |   2 +-
 src/trainer/worker.cc      |   1 -
 src/utils/cluster.cc       | 125 +++++++++++++++----------------
 src/utils/cluster_rt.cc    |  22 +++---
 7 files changed, 179 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index e2c979d..be0e0de 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -1,19 +1,16 @@
-#ifndef INCLUDE_UTILS_CLUSTER_H_
-#define INCLUDE_UTILS_CLUSTER_H_
+#ifndef SINGA_UTILS_CLUSTER_H_
+#define SINGA_UTILS_CLUSTER_H_
+
 #include <glog/logging.h>
 #include <string>
-#include <utility>
+#include <unordered_map>
 #include <memory>
 #include <vector>
-#include <unordered_map>
-#include "utils/common.h"
 #include "proto/job.pb.h"
 #include "proto/singa.pb.h"
 #include "utils/cluster_rt.h"
-
-using std::shared_ptr;
-using std::string;
-using std::vector;
+#include "utils/common.h"
+#include "utils/singleton.h"
 
 namespace singa {
 
@@ -24,113 +21,91 @@ namespace singa {
  */
 class Cluster {
  public:
-  static shared_ptr<Cluster> Get();
-  static shared_ptr<Cluster> Get(int job_id,
-      const SingaProto& singaConf, const ClusterProto& clusterConf);
-
-  const int nserver_groups()const{ return cluster_.nserver_groups(); }
-  const int nworker_groups()const { return cluster_.nworker_groups(); }
-  int nworkers_per_group()const {return cluster_.nworkers_per_group();}
-  int nservers_per_group()const {return cluster_.nservers_per_group();}
-  int nworkers_per_procs()const{return cluster_.nworkers_per_procs();}
-  int nservers_per_procs()const{return cluster_.nservers_per_procs();}
-  int nworker_groups_per_server_group() const {
-    if(nserver_groups()==0||nservers_per_group()==0)
+  // Cluster is a global singleton in a process
+  static Cluster* Setup(int job_id, const SingaProto& singaConf,
+                        const ClusterProto& clusterConf);
+  static Cluster* Get();
+
+  inline int nserver_groups() const { return cluster_.nserver_groups(); }
+  inline int nworker_groups() const { return cluster_.nworker_groups(); }
+  inline int nworkers_per_group() const { return cluster_.nworkers_per_group();}
+  inline int nservers_per_group() const { return cluster_.nservers_per_group();}
+  inline int nworkers_per_procs() const { return cluster_.nworkers_per_procs();}
+  inline int nservers_per_procs() const { return cluster_.nservers_per_procs();}
+  inline int nworker_groups_per_server_group() const {
+    if (nserver_groups() == 0 || nservers_per_group() == 0)
       return 1;
     else
-      return cluster_.nworker_groups()/cluster_.nserver_groups();
+      return cluster_.nworker_groups() / cluster_.nserver_groups();
   }
-
   /**
    * @return true if the calling procs has server threads, otherwise false
    */
-  bool has_server()const {
-    if(server_worker_separate()){
+  inline bool has_server() const {
+    if (server_worker_separate()) {
       CHECK_LT(procs_id_, nprocs_);
-      return procs_id_>=nworker_procs();
-    }else
-      return procs_id_<nserver_procs();
+      return procs_id_ >= nworker_procs();
+    } else {
+      return procs_id_ < nserver_procs();
+    }
   }
   /**
    * @return true if the calling procs has worker threads.
    */
-  bool has_worker()const {
-    return procs_id_<nworker_procs();
+  inline bool has_worker() const {
+    return procs_id_ < nworker_procs();
   }
   /**
    * @return global procs id, which starts from 0.
    */
-  int procs_id() const {return procs_id_;}
-  void set_procs_id(int procs_id) {procs_id_ = procs_id;}
-  bool server_worker_separate() const {
+  inline int procs_id() const { return procs_id_; }
+  inline void set_procs_id(int procs_id) { procs_id_ = procs_id; }
+  inline bool server_worker_separate() const {
     return cluster_.server_worker_separate();
   }
-  int nworker_procs() const {
-    return nworker_groups()*nworkers_per_group()/nworkers_per_procs();
-  }
-  int nserver_procs() const {
-    return nserver_groups()*nservers_per_group()/nservers_per_procs();
+  inline int nworker_procs() const {
+    return nworker_groups() * nworkers_per_group() / nworkers_per_procs();
   }
-  int nprocs() const {
-    return nprocs_;
+  inline int nserver_procs() const {
+    return nserver_groups() * nservers_per_group() / nservers_per_procs();
   }
-
-
+  inline int nprocs() const { return nprocs_; }
   /**
    * @return endpoint of the router of a procs with the specified id
    */
-  const string endpoint(const int procs_id) const;
-
-  const string workspace() {return cluster_.workspace();}
-
-  const string vis_folder() const {
-    return cluster_.workspace()+"/visualization";
-  }
-  const string checkpoint_folder() const {
-    return cluster_.workspace()+"/checkpoint";
+  inline std::string endpoint(int procs_id) const {
+    CHECK_LT(procs_id, nprocs());
+    CHECK_GE(procs_id, 0);
+    return cluster_rt_->GetProcHost(procs_id);
   }
-  /*
-  const int stub_timeout() const {
-    return cluster_.stub_timeout();
-  }
-  const int worker_timeout() const {
-    return cluster_.worker_timeout();
+  inline std::string workspace() const { return cluster_.workspace(); }
+  inline std::string vis_folder() const {
+    return cluster_.workspace() + "/visualization";
   }
-  const int server_timeout() const {
-    return cluster_.server_timeout();
+  inline std::string checkpoint_folder() const {
+    return cluster_.workspace() + "/checkpoint";
   }
+  /*
+  const int stub_timeout() const { return cluster_.stub_timeout(); }
+  const int worker_timeout() const { return cluster_.worker_timeout(); }
+  const int server_timeout() const { return cluster_.server_timeout(); }
   */
-
-  const bool server_update() const {
-    return cluster_.server_update();
-  }
-
-  const bool share_memory() const {
-    return cluster_.share_memory();
-  }
-
+  inline bool server_update() const { return cluster_.server_update(); }
+  inline bool share_memory() const { return cluster_.share_memory(); }
   /**
    * bandwidth Bytes/s
    */
-  const int bandwidth() const {
-    return cluster_.bandwidth();
-  }
-
-  const int poll_time() const {
-    return cluster_.poll_time();
-  }
-
-  shared_ptr<ClusterRuntime> runtime() const {
-    return cluster_rt_;
-  }
+  inline int bandwidth() const { return cluster_.bandwidth(); }
+  inline int poll_time() const { return cluster_.poll_time(); }
+  ClusterRuntime* runtime() const { return cluster_rt_; }
 
   /**
    * @return logical procs ID
    */
-  int ProcsIDOf(int group_id, int id, int flag);
-  const string hostip() const {
-    return hostip_;
+  inline int ProcsIDOf(int group_id, int id, int flag) {
+    return procs_ids_.at(Hash(group_id, id, flag));
   }
+  inline std::string hostip() const { return hostip_; }
   /**
    * Register this process.
    *
@@ -138,27 +113,24 @@ class Cluster {
    * logical process ID.
    * @param endpoint unique string for other procs to connect
    */
-  void Register(int pid, const string& endpoint);
+  void Register(int pid, const std::string& endpoint);
 
  private:
-  Cluster(int job, const SingaProto& singaConf, const ClusterProto& clusterConf);
+  void Init(int job, const SingaProto& singaConf,
+          const ClusterProto& clusterConf);
   void SetupFolders(const ClusterProto &cluster);
   int Hash(int gid, int id, int flag);
 
- private:
-  int procs_id_;
-  int nprocs_;
-  string hostip_;
-  std::vector<std::string> endpoints_;
+  int procs_id_ = -1;
+  int nprocs_ = 0;
+  std::string hostip_ = "";
   // cluster config proto
   ClusterProto cluster_;
   SingaProto singa_;
-  shared_ptr<ClusterRuntime> cluster_rt_;
-  // make this class a singlton
-  static shared_ptr<Cluster> instance_;
+  ClusterRuntime* cluster_rt_ = nullptr;
   std::unordered_map<int, int> procs_ids_;
 };
 
 }  // namespace singa
 
-#endif  // INCLUDE_UTILS_CLUSTER_H_
+#endif  // SINGA_UTILS_CLUSTER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 4c93c25..49f6e4d 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -9,46 +9,6 @@ namespace singa {
 
 typedef void (*rt_callback)(void *contest);
 
-/**
- * ClusterRuntime is a runtime service that manages dynamic configuration
- * and status of the whole cluster. It mainly provides following services:
- *    1)  Provide running status of each server/worker
- *    2)  Translate process id to (hostname:port)
- */
-class ClusterRuntime {
- public:
-  virtual ~ClusterRuntime() {}
-  /**
-   * Initialize the runtime instance
-   */
-  virtual bool Init() = 0;
-  /**
-   * register the process, and get a unique process id
-   *
-   * \return the process id, -1 if failed
-   */
-  virtual int RegistProc(const std::string& host_addr, int pid) = 0;
-  /**
-   * translate the process id to host address
-   *
-   * \return the host and port, "" if no such proc id 
-   */
-  virtual std::string GetProcHost(int proc_id) = 0;
-  /**
-   * Server: watch all workers in a server group,
-   * will be notified when all workers have left
-   */
-  virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0;
-  /**
-   * Worker: join a server group (i.e. start to read/update these servers)
-   */
-  virtual bool JoinSGroup(int gid, int wid, int s_group) = 0;
-  /**
-   * Worker: leave a server group (i.e. finish its all work)
-   */
-  virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
-};
-
 const int kZKBufSize = 100;
 // following paths are global
 const std::string kZKPathSinga = "/singa";
@@ -78,10 +38,14 @@ struct JobInfo {
   std::string name;
 };
 
+/*
+ * A wrapper for zookeeper service which handles error code and reconnections
+ */
 class ZKService {
  public:
   static void ChildChanges(zhandle_t* zh, int type, int state,
                            const char *path, void* watcherCtx);
+
   ~ZKService();
   bool Init(const std::string& host, int timeout);
   bool CreateNode(const char* path, const char* val, int flag, char* output);
@@ -102,18 +66,47 @@ class ZKService {
   zhandle_t* zkhandle_ = nullptr;
 };
 
-class ZKClusterRT : public ClusterRuntime {
+/**
+ * ClusterRuntime is a runtime service that manages dynamic configuration
+ * and status of the whole cluster. It mainly provides following services:
+ *    1)  Provide running status of each server/worker
+ *    2)  Translate process id to (hostname:port)
+ */
+class ClusterRuntime {
  public:
-  ZKClusterRT(const std::string& host, int job_id);
-  ZKClusterRT(const std::string& host, int job_id, int timeout);
-  ~ZKClusterRT() override;
-
-  bool Init() override;
-  int RegistProc(const std::string& host_addr, int pid) override;
-  std::string GetProcHost(int proc_id) override;
-  bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
-  bool JoinSGroup(int gid, int wid, int s_group) override;
-  bool LeaveSGroup(int gid, int wid, int s_group) override;
+  ClusterRuntime(const std::string& host, int job_id);
+  ClusterRuntime(const std::string& host, int job_id, int timeout);
+  ~ClusterRuntime();
+
+  /**
+   * Initialize the runtime instance
+   */
+  bool Init();
+  /**
+   * register the process, and get a unique process id
+   *
+   * \return the process id, -1 if failed
+   */
+  int RegistProc(const std::string& host_addr, int pid);
+  /**
+   * translate the process id to host address
+   *
+   * \return the host and port, "" if no such proc id 
+   */
+  std::string GetProcHost(int proc_id);
+  /**
+   * Server: watch all workers in a server group,
+   * will be notified when all workers have left
+   */
+  bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx);
+  /**
+   * Worker: join a server group (i.e. start to read/update these servers)
+   */
+  bool JoinSGroup(int gid, int wid, int s_group);
+  /**
+   * Worker: leave a server group (i.e. finish its all work)
+   */
+  bool LeaveSGroup(int gid, int wid, int s_group);
 
  private:
   inline std::string groupPath(int gid) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/proto/common.proto
----------------------------------------------------------------------
diff --git a/src/proto/common.proto b/src/proto/common.proto
index 3b6efb3..8927f2b 100644
--- a/src/proto/common.proto
+++ b/src/proto/common.proto
@@ -84,7 +84,7 @@ message SingleLabelImageRecord {
 }
 
 message MetricProto {
-  repeated string name =1;
+  repeated string name = 1;
   repeated int32 count = 2;
   repeated float val = 3;
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index ea060ec..fe90bc4 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -226,7 +226,7 @@ void Trainer::Resume(JobProto* jobConf) {
 
 void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
   // register job to zookeeper at the beginning
-  auto cluster = Cluster::Get(job->id(), singaConf, job->cluster());
+  auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
   if (resume)
     Resume(job);
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 6052f3a..3b1617d 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -25,7 +25,6 @@ void Worker::Setup(
   train_net_ = train_net;
   validation_net_ = valid_net;
   test_net_ = test_net;
-  auto cluster = Cluster::Get();
 }
 
 Worker::~Worker() {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 6dad2a8..9664064 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -1,97 +1,86 @@
-#include <glog/logging.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <fstream>
 #include "utils/cluster.h"
-#include "proto/common.pb.h"
+
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <unistd.h>
+#include <fstream>
+
 namespace singa {
 
-std::shared_ptr<Cluster> Cluster::instance_;
-Cluster::Cluster(
-    int job, const SingaProto& singaConf, const ClusterProto& clusterConf) {
+Cluster* Cluster::Setup(int job, const SingaProto& singaConf,
+                        const ClusterProto& clusterConf) {
+  Singleton<Cluster>::Instance()->Init(job, singaConf, clusterConf);
+  return Singleton<Cluster>::Instance();
+}
+
+Cluster* Cluster::Get() {
+  if (!Singleton<Cluster>::Instance()->nprocs_) {
+    LOG(ERROR) << "The first call to Get should "
+               << "provide the sys/model conf path";
+  }
+  return Singleton<Cluster>::Instance();
+}
+
+void Cluster::Register(int pid, const std::string& endpoint) {
+  procs_id_ = cluster_rt_->RegistProc(endpoint, pid);
+  CHECK_GE(procs_id_, 0);
+  CHECK_LT(procs_id_, nprocs());
+  LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
+             << " (pid = " << pid << ")";
+}
+
+void Cluster::Init(int job, const SingaProto& singaConf,
+                   const ClusterProto& clusterConf) {
   cluster_ = clusterConf;
   singa_ = singaConf;
   SetupFolders(clusterConf);
-  if(server_worker_separate())
-    nprocs_=nworker_procs()+nserver_procs();
+  if (server_worker_separate())
+    nprocs_ = nworker_procs() + nserver_procs();
   else
-    nprocs_=std::max(nworker_procs(), nserver_procs());
+    nprocs_ = std::max(nworker_procs(), nserver_procs());
 
   // locate the process id of every worker/server
-  int ngrps=cluster_.nworker_groups(), grp_size=cluster_.nworkers_per_group();
-  int procs=0;
-  for(int i=0;i<ngrps;i++){
-    for(int j=0;j<grp_size;j++){
-      procs=(i*grp_size+j) / cluster_.nworkers_per_procs();
-      procs_ids_[Hash(i,j,kWorkerLayer)]=procs;
-      procs_ids_[Hash(i,j,kWorkerParam)]=procs;
+  int ngrps = cluster_.nworker_groups();
+  int grp_size = cluster_.nworkers_per_group();
+  int procs = 0;
+  for (int i = 0; i < ngrps; ++i) {
+    for (int j = 0; j < grp_size; ++j) {
+      procs = (i * grp_size + j) / cluster_.nworkers_per_procs();
+      procs_ids_[Hash(i, j, kWorkerLayer)] = procs;
+      procs_ids_[Hash(i, j, kWorkerParam)] = procs;
     }
   }
-  int offset=cluster_.server_worker_separate()? procs+1:0;
-  ngrps=cluster_.nserver_groups(), grp_size=cluster_.nservers_per_group();
-  for(int i=0;i<ngrps;i++){
-    for(int j=0;j<grp_size;j++){
-      procs_ids_[Hash(i,j,kServer)]=(i*grp_size+j) / cluster_.nservers_per_procs()+offset;
+  int offset = cluster_.server_worker_separate() ? procs + 1 : 0;
+  ngrps = cluster_.nserver_groups();
+  grp_size = cluster_.nservers_per_group();
+  for (int i = 0; i < ngrps; ++i) {
+    for (int j = 0; j < grp_size; ++j) {
+      procs_ids_[Hash(i, j, kServer)] =
+          (i * grp_size + j) / cluster_.nservers_per_procs() + offset;
     }
   }
-
-  auto rt = new ZKClusterRT(singa_.zookeeper_host(), job);
-  rt->Init();
-  cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
-
-  hostip_=GetHostIP();
-}
-
-void Cluster::Register(int pid, const string& endpoint) {
-  procs_id_=cluster_rt_->RegistProc(endpoint, pid);
-  CHECK_GE(procs_id_,0);
-  CHECK_LT(procs_id_,nprocs());
-  LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
-             << " (pid = " << pid << ")";
-}
-
-const string Cluster::endpoint(int procsid) const {
-  CHECK_LT(procsid, nprocs());
-  CHECK_GE(procsid, 0);
-  if(endpoints_.size())
-    return endpoints_.at(procsid);
-  else
-    return cluster_rt_->GetProcHost(procsid);
+  cluster_rt_ = new ClusterRuntime(singa_.zookeeper_host(), job);
+  cluster_rt_->Init();
+  hostip_ = GetHostIP();
 }
 
-void Cluster::SetupFolders(const ClusterProto &cluster){
+void Cluster::SetupFolders(const ClusterProto &cluster) {
   // create visulization folder
   mkdir(vis_folder().c_str(),  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
   // create checkpoint folder
   mkdir(checkpoint_folder().c_str(),  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 }
 
-shared_ptr<Cluster> Cluster::Get(
-    int job, const SingaProto& singaConf, const ClusterProto& clusterConf) {
-  instance_.reset(new Cluster(job, singaConf, clusterConf));
-  return instance_;
-}
-
-shared_ptr<Cluster> Cluster::Get() {
-  if(!instance_) {
-    LOG(ERROR)<<"The first call to Get should "
-              <<"provide the sys/model conf path";
-  }
-  return instance_;
-}
 int Cluster::Hash(int gid, int id, int flag) {
-  int ret=-1;
-  if(flag==kServer){
-    ret=(flag*cluster_.nserver_groups()+gid)*cluster_.nservers_per_group() + id;
-  }else{
-    ret=(flag*cluster_.nworker_groups()+gid)*cluster_.nworkers_per_group() + id;
+  int ret = -1;
+  if (flag == kServer) {
+    ret = (flag * cluster_.nserver_groups() + gid)
+          * cluster_.nservers_per_group() + id;
+  } else {
+    ret = (flag * cluster_.nworker_groups() + gid)
+          * cluster_.nworkers_per_group() + id;
   }
   return ret;
 }
-int Cluster::ProcsIDOf(int group_id, int id, int flag) {
-  return procs_ids_.at(Hash(group_id, id, flag));
-}
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b24f0a32/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 454ecfc..5cb670e 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -39,7 +39,7 @@ ZKService::~ZKService() {
   zookeeper_close(zkhandle_);
 }
 
-char zk_cxt[] = "ZKClusterRT";
+char zk_cxt[] = "ClusterRuntime";
 
 bool ZKService::Init(const string& host, int timeout) {
   zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
@@ -174,10 +174,10 @@ void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
   }
 }
 
-ZKClusterRT::ZKClusterRT(const string& host, int job_id)
-    : ZKClusterRT(host, job_id, 30000) {}
+ClusterRuntime::ClusterRuntime(const string& host, int job_id)
+    : ClusterRuntime(host, job_id, 30000) {}
 
-ZKClusterRT::ZKClusterRT(const string& host, int job_id, int timeout) {
+ClusterRuntime::ClusterRuntime(const string& host, int job_id, int timeout) {
   host_ = host;
   timeout_ = timeout;
   workspace_ = GetZKJobWorkspace(job_id);
@@ -186,14 +186,14 @@ ZKClusterRT::ZKClusterRT(const string& host, int job_id, int timeout) {
   proc_lock_path_ = workspace_ + kZKPathJobPLock;
 }
 
-ZKClusterRT::~ZKClusterRT() {
+ClusterRuntime::~ClusterRuntime() {
   // release callback vector
   for (RTCallback* p : cb_vec_) {
     delete p;
   }
 }
 
-bool ZKClusterRT::Init() {
+bool ClusterRuntime::Init() {
   if (!zk_.Init(host_, timeout_)) return false;
   if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
     return false;
@@ -210,7 +210,7 @@ bool ZKClusterRT::Init() {
   return true;
 }
 
-int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
+int ClusterRuntime::RegistProc(const string& host_addr, int pid) {
   char buf[kZKBufSize];
   string lock = proc_lock_path_ + "/lock-";
   if (!zk_.CreateNode(lock.c_str(), nullptr,
@@ -245,7 +245,7 @@ int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
   return id;
 }
 
-bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
+bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
   CHECK_NOTNULL(fn);
   string path = groupPath(gid);
   // create zk node
@@ -260,7 +260,7 @@ bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
   return zk_.WGetChild(path.c_str(), &child, cb);
 }
 
-std::string ZKClusterRT::GetProcHost(int proc_id) {
+std::string ClusterRuntime::GetProcHost(int proc_id) {
   // char buf[kZKBufSize];
   char val[kZKBufSize];
   // construct file name
@@ -273,13 +273,13 @@ std::string ZKClusterRT::GetProcHost(int proc_id) {
   return string(val);
 }
 
-bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
+bool ClusterRuntime::JoinSGroup(int gid, int wid, int s_group) {
   string path = groupPath(s_group) + workerPath(gid, wid);
   // try to create an ephemeral node under server group path
   return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
 }
 
-bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
+bool ClusterRuntime::LeaveSGroup(int gid, int wid, int s_group) {
   string path = groupPath(s_group) + workerPath(gid, wid);
   return zk_.DeleteNode(path.c_str());
 }