You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/07/16 18:52:21 UTC

[2/9] incubator-singa git commit: SINGA-34 Support external zookeeper service

SINGA-34 Support external zookeeper service

add a singa tool in src/utils/tool.cc to clean zookeeper content.
It will be compiled as an executable 'singatool'.
Now singa-stop.sh will not close the entire zookeeper service.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/209ba1f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/209ba1f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/209ba1f2

Branch: refs/heads/master
Commit: 209ba1f2f172d8784f1c6219a32507b69ada4541
Parents: 3819e59
Author: wang sheng <wa...@gmail.com>
Authored: Thu Jul 16 21:38:09 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Thu Jul 16 21:56:01 2015 +0800

----------------------------------------------------------------------
 .gitignore                 |   1 +
 Makefile.example           |   4 +-
 bin/singa-run.sh           |   6 +-
 bin/singa-stop.sh          |   7 +-
 include/trainer/trainer.h  |   1 -
 include/utils/cluster.h    |   1 -
 include/utils/cluster_rt.h |  82 +++++++---
 src/proto/cluster.proto    |   7 +
 src/proto/global.proto     |   8 -
 src/utils/cluster_rt.cc    | 339 ++++++++++++++++++++++------------------
 src/utils/tool.cc          |  25 +++
 11 files changed, 290 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 527972b..be85b4f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,6 +28,7 @@ tmp/
 *lmdb
 *.binaryproto
 singa
+singatool
 .libs
 *.la
 *.deps

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/Makefile.example
----------------------------------------------------------------------
diff --git a/Makefile.example b/Makefile.example
index f2c58fe..2195b91 100644
--- a/Makefile.example
+++ b/Makefile.example
@@ -31,7 +31,7 @@ PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h))
 PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o))
 
 # each singa src file will generate a .o file
-SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" \) \
+SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" -o -path "src/utils/tool.cc" \) \
 	-prune -o \( -name "*.cc" -type f \) -print )
 SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \
 	$(PROTO_OBJS) )
@@ -53,6 +53,8 @@ OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) )
 singa: $(PROTO_OBJS) $(SINGA_OBJS)
 	$(CXX) $(SINGA_OBJS) src/main.cc -o singa $(CXXFLAGS) $(LDFLAGS)
 	@echo
+	$(CXX) $(SINGA_OBJS) src/utils/tool.cc -o singatool $(CXXFLAGS) $(LDFLAGS)
+	@echo
 
 loader: proto $(LOADER_OBJS)
 	$(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS)

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/bin/singa-run.sh
----------------------------------------------------------------------
diff --git a/bin/singa-run.sh b/bin/singa-run.sh
index 45be0a1..c548a54 100755
--- a/bin/singa-run.sh
+++ b/bin/singa-run.sh
@@ -61,15 +61,15 @@ BASE=`cd "$BIN/..">/dev/null; pwd`
 
 cd $BASE
 
-# clenup singa data
-$BIN/singa-stop.sh conf/hostfile
-
 # start zookeeper
 $BIN/zk-service.sh start 2>/dev/null
 
 # wait for zk service to be up
 sleep 3
 
+# clenup singa data
+$BIN/singa-stop.sh conf/hostfile
+
 # check mode
 if [ $# = 2 ] ; then
   # start single singa process

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/bin/singa-stop.sh
----------------------------------------------------------------------
diff --git a/bin/singa-stop.sh b/bin/singa-stop.sh
index 1b36675..64cde64 100755
--- a/bin/singa-stop.sh
+++ b/bin/singa-stop.sh
@@ -40,6 +40,7 @@ ZKDATA_DIR="/tmp/zookeeper"
 PROC_NAME="*singa"
 HOST_FILE=$1
 
+cd $BASE
 
 # kill singa processes
 if [ $# = 0 ] ; then
@@ -62,10 +63,10 @@ elif [ $# = 1 ] ; then
   done
 fi
 
-# close zookeeper
-. $BIN/zk-service.sh stop 2>/dev/null
+# wait for killall command
+sleep 2
 
 echo cleanning metadata in zookeeper ...
 # remove zk data
-rm -r $ZKDATA_DIR
+./singatool
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 0ee01d4..bc81e72 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -2,7 +2,6 @@
 #define INCLUDE_TRAINER_TRAINER_H_
 #include <unordered_map>
 #include "proto/cluster.pb.h"
-#include "proto/global.pb.h"
 #include "proto/model.pb.h"
 #include "utils/updater.h"
 #include "utils/param.h"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 68ae937..4b87da0 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -9,7 +9,6 @@
 #include "utils/common.h"
 #include "proto/cluster.pb.h"
 #include "utils/cluster_rt.h"
-#include "proto/global.pb.h"
 
 using std::shared_ptr;
 using std::string;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 1b877ec..55ca243 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -49,6 +49,42 @@ class ClusterRuntime {
   virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
 };
 
+const std::string kZKPathSinga = "/singa";
+const std::string kZKPathStatus = "/singa/status";
+const std::string kZKPathRegist = "/singa/regist";
+const std::string kZKPathRegistProc = "/singa/regist/proc";
+const std::string kZKPathRegistLock = "/singa/regist/lock";
+const int kZKBufSize = 50;
+
+struct RTCallback {
+  rt_callback fn;
+  void* ctx;
+};
+
+class ZKService {
+ public:
+  static void ChildChanges(zhandle_t* zh, int type, int state,
+                           const char *path, void* watcherCtx);
+  ~ZKService();
+  bool Init(const std::string& host, int timeout);
+  bool CreateNode(const char* path, const char* val, int flag, char* output);
+  bool DeleteNode(const char* path);
+  bool Exist(const char* path);
+  bool GetNode(const char* path, char* output);
+  bool GetChild(const char* path, std::vector<std::string>* vt);
+  bool WGetChild(const char* path, std::vector<std::string>* vt,
+                   RTCallback *cb);
+
+ private:
+  const int kNumRetry = 10;
+  const int kSleepSec = 1;
+
+  static void WatcherGlobal(zhandle_t* zh, int type, int state,
+                            const char *path, void* watcherCtx);
+
+  zhandle_t* zkhandle_ = nullptr;
+};
+
 class ZKClusterRT : public ClusterRuntime {
  public:
   explicit ZKClusterRT(const std::string& host);
@@ -63,35 +99,33 @@ class ZKClusterRT : public ClusterRuntime {
   bool LeaveSGroup(int gid, int wid, int s_group) override;
 
  private:
-  struct RTCallback {
-    rt_callback fn;
-    void* ctx;
-  };
+  inline std::string groupPath(int gid) {
+    return kZKPathStatus + "/sg" + std::to_string(gid);
+  }
+  inline std::string workerPath(int gid, int wid) {
+    return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
+  }
+  
+  int timeout_ = 30000;
+  std::string host_ = "";
+  ZKService zk_;
+  std::vector<RTCallback*> cb_vec_;
+};
 
-  const int kMaxBufLen = 50;
-  const int kNumRetry = 10;
-  const int kSleepSec = 1;
-  const std::string kZPathSinga = "/singa";
-  const std::string kZPathStatus = "/singa/status";
-  const std::string kZPathRegist = "/singa/regist";
-  const std::string kZPathRegistProc = "/singa/regist/proc";
-  const std::string kZPathRegistLock = "/singa/regist/lock";
+class JobManager {
+ public:
+  explicit JobManager(const std::string& host);
+  JobManager(const std::string& host, int timeout);
 
-  static void WatcherGlobal(zhandle_t* zh, int type, int state,
-                            const char *path, void* watcherCtx);
-  static void ChildChanges(zhandle_t* zh, int type, int state,
-                           const char *path, void* watcherCtx);
-  bool CreateZKNode(const char* path, const char* val, int flag, char* output);
-  bool DeleteZKNode(const char* path);
-  bool GetZKNode(const char* path, char* output);
-  bool GetZKChild(const char* path, std::vector<std::string>* vt);
-  inline std::string groupPath(int gid);
-  std::string workerPath(int gid, int wid);
+  bool Init();
+  bool Clean();
+
+ private:
+  bool CleanPath(const std::string& path);
 
   int timeout_ = 30000;
   std::string host_ = "";
-  zhandle_t* zkhandle_ = nullptr;
-  std::vector<RTCallback*> cb_vec_;
+  ZKService zk_;
 };
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/proto/cluster.proto
----------------------------------------------------------------------
diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto
index 54ce300..7afb866 100644
--- a/src/proto/cluster.proto
+++ b/src/proto/cluster.proto
@@ -1,5 +1,12 @@
 package singa;
 
+message GlobalProto {
+  // ip/hostname:port[,ip/hostname:port]
+  required string zookeeper_host = 1;
+  // if not set, use the default dir of glog
+  optional string log_dir = 2 [default = "/tmp/singa-log/"];
+}
+
 message ClusterProto {
   optional int32 nworker_groups = 1;
   optional int32 nserver_groups = 2;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/proto/global.proto
----------------------------------------------------------------------
diff --git a/src/proto/global.proto b/src/proto/global.proto
deleted file mode 100644
index 84eb7be..0000000
--- a/src/proto/global.proto
+++ /dev/null
@@ -1,8 +0,0 @@
-package singa;
-
-message GlobalProto {
-  // ip/hostname:port[,ip/hostname:port]
-  required string zookeeper_host = 1;
-  // if not set, use the default dir of glog
-  optional string log_dir = 2 [default = "/tmp/singa-log/"];
-}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 6143567..408adde 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -3,147 +3,12 @@
 #include <glog/logging.h>
 #include <algorithm>
 
-using std::to_string;
 using std::string;
+using std::to_string;
 
 namespace singa {
 
-ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {}
-
-ZKClusterRT::ZKClusterRT(const string& host, int timeout) {
-  host_ = host;
-  timeout_ = timeout;
-  zkhandle_ = nullptr;
-}
-
-ZKClusterRT::~ZKClusterRT() {
-  // close zookeeper handler
-  zookeeper_close(zkhandle_);
-  // release callback vector
-  for (RTCallback* p : cb_vec_) {
-    delete p;
-  }
-}
-
-char zk_cxt[] = "ZKClusterRT";
-
-bool ZKClusterRT::Init() {
-  zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
-  zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0,
-                             static_cast<void *>(zk_cxt), 0);
-  if (zkhandle_ == NULL) {
-    LOG(ERROR) << "Error when connecting to zookeeper servers...";
-    LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
-    LOG(ERROR) << host_.c_str();
-    return false;
-  }
-  // create kZPathSinga
-  if (!CreateZKNode(kZPathSinga.c_str(), nullptr, 0, nullptr))
-    return false;
-  // create kZPathStatus
-  if (!CreateZKNode(kZPathStatus.c_str(), nullptr, 0, nullptr))
-    return false;
-  // create kZPathRegist
-  if (!CreateZKNode(kZPathRegist.c_str(), nullptr, 0, nullptr))
-    return false;
-  // create kZPathRegistProc
-  if (!CreateZKNode(kZPathRegistProc.c_str(), nullptr, 0, nullptr))
-    return false;
-  // create kZPathRegistLock
-  if (!CreateZKNode(kZPathRegistLock.c_str(), nullptr, 0, nullptr))
-    return false;
-
-  return true;
-}
-
-int ZKClusterRT::RegistProc(const string& host_addr) {
-  char buf[kMaxBufLen];
-  string lock = kZPathRegistLock+"/lock-";
-  if (!CreateZKNode(lock.c_str(), nullptr, ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
-    return -1;
-  }
-  // get all children in lock folder
-  std::vector<string> vt;
-  if (!GetZKChild(kZPathRegistLock.c_str(), &vt)) {
-    return -1;
-  }
-  // find own position among all locks
-  int id = -1;
-  std::sort(vt.begin(), vt.end());
-  for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
-    if (kZPathRegistLock+"/"+vt[i] == buf) {
-      id = i;
-      break;
-    }
-  }
-  if (id == -1) {
-    LOG(ERROR) << "cannot find own node " << buf;
-    return -1;
-  }
-  // create a new node in proc path
-  string path = kZPathRegistProc+"/proc-"+to_string(id);
-  if (!CreateZKNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, nullptr)) {
-    return -1;
-  }
-  return id;
-}
-
-string ZKClusterRT::GetProcHost(int proc_id) {
-  // char buf[kMaxBufLen];
-  char val[kMaxBufLen];
-  // construct file name
-  string path = kZPathRegistProc+"/proc-"+to_string(proc_id);
-  if (!GetZKNode(path.c_str(), val)) return "";
-  return string(val);
-}
-
-bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
-  CHECK_NOTNULL(fn);
-  string path = groupPath(gid);
-  // create zk node
-  if (!CreateZKNode(path.c_str(), nullptr, 0, nullptr)) return false;
-  struct String_vector child;
-  // store the callback function and context for later usage
-  RTCallback *cb = new RTCallback;
-  cb->fn = fn;
-  cb->ctx = ctx;
-  cb_vec_.push_back(cb);
-  // start to watch on the zk node, does not care about the first return value
-  int ret = zoo_wget_children(zkhandle_, path.c_str(), ChildChanges,
-                              cb, &child);
-  if (ret != ZOK) {
-    LOG(ERROR) << "failed to get child of " << path;
-    return false;
-  }
-  return true;
-}
-
-bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
-  string path = groupPath(s_group) + workerPath(gid, wid);
-  // try to create an ephemeral node under server group path
-  if (!CreateZKNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr)) {
-    return false;
-  }
-  return true;
-}
-
-bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
-  string path = groupPath(s_group) + workerPath(gid, wid);
-  if (!DeleteZKNode(path.c_str())) return false;
-  return true;
-}
-
-void ZKClusterRT::WatcherGlobal(zhandle_t * zh, int type, int state,
-                                const char *path, void *watcherCtx) {
-  if (type == ZOO_SESSION_EVENT) {
-    if (state == ZOO_CONNECTED_STATE)
-      LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
-    else if (state == ZOO_EXPIRED_SESSION_STATE)
-      LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
-  }
-}
-
-void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state,
+void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
                                const char *path, void *watcherCtx) {
   // check if already callback
   RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
@@ -168,14 +33,35 @@ void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state,
   }
 }
 
-bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag,
+ZKService::~ZKService() {
+  // close zookeeper handler
+  zookeeper_close(zkhandle_);
+}
+
+char zk_cxt[] = "ZKClusterRT";
+
+bool ZKService::Init(const string& host, int timeout) {
+  zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
+  zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
+                             static_cast<void *>(zk_cxt), 0);
+  if (zkhandle_ == NULL) {
+    LOG(ERROR) << "Error when connecting to zookeeper servers...";
+    LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
+    LOG(ERROR) << host.c_str();
+    return false;
+  }
+
+  return true;
+}
+
+bool ZKService::CreateNode(const char* path, const char* val, int flag,
                                char* output) {
-  char buf[kMaxBufLen];
+  char buf[kZKBufSize];
   int ret = 0;
   // send the zk request
   for (int i = 0; i < kNumRetry; ++i) {
     ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
-                     &ZOO_OPEN_ACL_UNSAFE, flag, buf, kMaxBufLen);
+                     &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
     if (ret == ZNONODE) {
       LOG(WARNING) << "zookeeper parent node of " << path
                   << " not exist, retry later";
@@ -202,7 +88,7 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag,
   return false;
 }
 
-bool ZKClusterRT::DeleteZKNode(const char* path) {
+bool ZKService::DeleteNode(const char* path) {
   int ret = zoo_delete(zkhandle_, path, -1);
   if (ret == ZOK) {
     LOG(INFO) << "deleted zookeeper node " << path;
@@ -215,9 +101,18 @@ bool ZKClusterRT::DeleteZKNode(const char* path) {
   return false;
 }
 
-bool ZKClusterRT::GetZKNode(const char* path, char* output) {
+bool ZKService::Exist(const char* path) {
   struct Stat stat;
-  int val_len = kMaxBufLen;
+  int ret = zoo_exists(zkhandle_, path, 0, &stat);
+  if (ret == ZOK) return true;
+  else if (ret == ZNONODE) return false;
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
+  return false;
+}
+
+bool ZKService::GetNode(const char* path, char* output) {
+  struct Stat stat;
+  int val_len = kZKBufSize;
   int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
   if (ret == ZOK) {
     output[val_len] = '\0';
@@ -230,24 +125,168 @@ bool ZKClusterRT::GetZKNode(const char* path, char* output) {
   return false;
 }
 
-bool ZKClusterRT::GetZKChild(const char* path, std::vector<string>* vt) {
-  // get all children in lock folder
+bool ZKService::GetChild(const char* path, std::vector<string>* vt) {
   struct String_vector child;
   int ret = zoo_get_children(zkhandle_, path, 0, &child);
   if (ret == ZOK) {
     for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
     return true;
   }
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
   return false;
 }
 
-string ZKClusterRT::groupPath(int gid) {
-  return kZPathStatus+"/sg"+to_string(gid);
+bool ZKService::WGetChild(const char* path, std::vector<std::string>* vt,
+                            RTCallback *cb) {
+  struct String_vector child;
+  int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
+  if (ret == ZOK) {
+    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
+    return true;
+  }
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
+  return false;
+}
+
+
+void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
+                                const char *path, void *watcherCtx) {
+  if (type == ZOO_SESSION_EVENT) {
+    if (state == ZOO_CONNECTED_STATE)
+      LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
+    else if (state == ZOO_EXPIRED_SESSION_STATE)
+      LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
+  }
+}
+
+ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {}
+
+ZKClusterRT::ZKClusterRT(const string& host, int timeout) {
+  host_ = host;
+  timeout_ = timeout;
 }
 
-string ZKClusterRT::workerPath(int gid, int wid) {
-  return "/g"+to_string(gid)+"_w"+to_string(wid);
+ZKClusterRT::~ZKClusterRT() {
+  // release callback vector
+  for (RTCallback* p : cb_vec_) {
+    delete p;
+  }
+}
+
+bool ZKClusterRT::Init() {
+  if (!zk_.Init(host_, timeout_)) return false;
+  // create kZKPathSinga
+  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
+    return false;
+  // create kZKPathStatus
+  if (!zk_.CreateNode(kZKPathStatus.c_str(), nullptr, 0, nullptr))
+    return false;
+  // create kZKPathRegist
+  if (!zk_.CreateNode(kZKPathRegist.c_str(), nullptr, 0, nullptr))
+    return false;
+  // create kZKPathRegistProc
+  if (!zk_.CreateNode(kZKPathRegistProc.c_str(), nullptr, 0, nullptr))
+    return false;
+  // create kZKPathRegistLock
+  if (!zk_.CreateNode(kZKPathRegistLock.c_str(), nullptr, 0, nullptr))
+    return false;
+  return true;
+}
+
+int ZKClusterRT::RegistProc(const string& host_addr) {
+  char buf[kZKBufSize];
+  string lock = kZKPathRegistLock+"/lock-";
+  if (!zk_.CreateNode(lock.c_str(), nullptr,
+                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
+    return -1;
+  }
+  // get all children in lock folder
+  std::vector<string> vt;
+  if (!zk_.GetChild(kZKPathRegistLock.c_str(), &vt)) {
+    return -1;
+  }
+  // find own position among all locks
+  int id = -1;
+  std::sort(vt.begin(), vt.end());
+  for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
+    if (kZKPathRegistLock+"/"+vt[i] == buf) {
+      id = i;
+      break;
+    }
+  }
+  if (id == -1) {
+    LOG(ERROR) << "cannot find own node " << buf;
+    return -1;
+  }
+  // create a new node in proc path
+  string path = kZKPathRegistProc+"/proc-"+to_string(id);
+  if (!zk_.CreateNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL,
+                      nullptr)) {
+    return -1;
+  }
+  return id;
+}
+
+bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
+  CHECK_NOTNULL(fn);
+  string path = groupPath(gid);
+  // create zk node
+  if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
+  std::vector<string> child;
+  // store the callback function and context for later usage
+  RTCallback *cb = new RTCallback;
+  cb->fn = fn;
+  cb->ctx = ctx;
+  cb_vec_.push_back(cb);
+  // start to watch on the zk node, does not care about the first return value
+  return zk_.WGetChild(path.c_str(), &child, cb);
+}
+
+string ZKClusterRT::GetProcHost(int proc_id) {
+  // char buf[kZKBufSize];
+  char val[kZKBufSize];
+  // construct file name
+  string path = kZKPathRegistProc+"/proc-"+to_string(proc_id);
+  if (!zk_.GetNode(path.c_str(), val)) return "";
+  return string(val);
+}
+
+bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
+  string path = groupPath(s_group) + workerPath(gid, wid);
+  // try to create an ephemeral node under server group path
+  return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
+}
+
+bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
+  string path = groupPath(s_group) + workerPath(gid, wid);
+  return zk_.DeleteNode(path.c_str());
+}
+
+JobManager::JobManager(const string& host) : JobManager(host, 30000) {}
+
+JobManager::JobManager(const string& host, int timeout) {
+  host_ = host;
+  timeout_ = timeout;
+}
+
+bool JobManager::Init() {
+  return zk_.Init(host_, timeout_);
+}
+
+bool JobManager::Clean() {
+  if (zk_.Exist(kZKPathSinga.c_str())) {
+    return CleanPath(kZKPathSinga.c_str());
+  }
+  return true;
+}
+
+bool JobManager::CleanPath(const std::string& path) {
+  std::vector<string> child;
+  if (!zk_.GetChild(path.c_str(), &child)) return false;
+  for (string c : child) {
+    if (!CleanPath(path + "/" + c)) return false;
+  }
+  return zk_.DeleteNode(path.c_str());
 }
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
new file mode 100644
index 0000000..37ce729
--- /dev/null
+++ b/src/utils/tool.cc
@@ -0,0 +1,25 @@
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "proto/cluster.pb.h"
+#include "utils/cluster_rt.h"
+#include "utils/common.h"
+
+DEFINE_string(global, "conf/singa.conf", "Global config file");
+
+int main(int argc, char **argv) {
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  singa::GlobalProto global;
+  singa::ReadProtoFromTextFile(FLAGS_global.c_str(), &global);
+  singa::SetupLog(global.log_dir(), "SingaTool");
+
+  LOG(INFO) << "The global config is \n" << global.DebugString();
+  
+  singa::JobManager mng(global.zookeeper_host());
+  int ret = 0;
+  if (!mng.Init()) ret = 1;
+  if (!mng.Clean()) ret = 1;
+  if (ret) LOG(ERROR) << "errors in SingaTool!";
+  return ret;
+}