You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/07/16 18:52:21 UTC
[2/9] incubator-singa git commit: SINGA-34 Support external zookeeper
service
SINGA-34 Support external zookeeper service
add a singa tool in src/utils/tool.cc to clean zookeeper content.
It will be compiled as an executable 'singatool'.
Now singa-stop.sh will not close the entire zookeeper service.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/209ba1f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/209ba1f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/209ba1f2
Branch: refs/heads/master
Commit: 209ba1f2f172d8784f1c6219a32507b69ada4541
Parents: 3819e59
Author: wang sheng <wa...@gmail.com>
Authored: Thu Jul 16 21:38:09 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Thu Jul 16 21:56:01 2015 +0800
----------------------------------------------------------------------
.gitignore | 1 +
Makefile.example | 4 +-
bin/singa-run.sh | 6 +-
bin/singa-stop.sh | 7 +-
include/trainer/trainer.h | 1 -
include/utils/cluster.h | 1 -
include/utils/cluster_rt.h | 82 +++++++---
src/proto/cluster.proto | 7 +
src/proto/global.proto | 8 -
src/utils/cluster_rt.cc | 339 ++++++++++++++++++++++------------------
src/utils/tool.cc | 25 +++
11 files changed, 290 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 527972b..be85b4f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,6 +28,7 @@ tmp/
*lmdb
*.binaryproto
singa
+singatool
.libs
*.la
*.deps
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/Makefile.example
----------------------------------------------------------------------
diff --git a/Makefile.example b/Makefile.example
index f2c58fe..2195b91 100644
--- a/Makefile.example
+++ b/Makefile.example
@@ -31,7 +31,7 @@ PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h))
PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o))
# each singa src file will generate a .o file
-SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" \) \
+SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" -o -path "src/utils/tool.cc" \) \
-prune -o \( -name "*.cc" -type f \) -print )
SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \
$(PROTO_OBJS) )
@@ -53,6 +53,8 @@ OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) )
singa: $(PROTO_OBJS) $(SINGA_OBJS)
$(CXX) $(SINGA_OBJS) src/main.cc -o singa $(CXXFLAGS) $(LDFLAGS)
@echo
+ $(CXX) $(SINGA_OBJS) src/utils/tool.cc -o singatool $(CXXFLAGS) $(LDFLAGS)
+ @echo
loader: proto $(LOADER_OBJS)
$(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS)
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/bin/singa-run.sh
----------------------------------------------------------------------
diff --git a/bin/singa-run.sh b/bin/singa-run.sh
index 45be0a1..c548a54 100755
--- a/bin/singa-run.sh
+++ b/bin/singa-run.sh
@@ -61,15 +61,15 @@ BASE=`cd "$BIN/..">/dev/null; pwd`
cd $BASE
-# clenup singa data
-$BIN/singa-stop.sh conf/hostfile
-
# start zookeeper
$BIN/zk-service.sh start 2>/dev/null
# wait for zk service to be up
sleep 3
+# clenup singa data
+$BIN/singa-stop.sh conf/hostfile
+
# check mode
if [ $# = 2 ] ; then
# start single singa process
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/bin/singa-stop.sh
----------------------------------------------------------------------
diff --git a/bin/singa-stop.sh b/bin/singa-stop.sh
index 1b36675..64cde64 100755
--- a/bin/singa-stop.sh
+++ b/bin/singa-stop.sh
@@ -40,6 +40,7 @@ ZKDATA_DIR="/tmp/zookeeper"
PROC_NAME="*singa"
HOST_FILE=$1
+cd $BASE
# kill singa processes
if [ $# = 0 ] ; then
@@ -62,10 +63,10 @@ elif [ $# = 1 ] ; then
done
fi
-# close zookeeper
-. $BIN/zk-service.sh stop 2>/dev/null
+# wait for killall command
+sleep 2
echo cleanning metadata in zookeeper ...
# remove zk data
-rm -r $ZKDATA_DIR
+./singatool
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 0ee01d4..bc81e72 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -2,7 +2,6 @@
#define INCLUDE_TRAINER_TRAINER_H_
#include <unordered_map>
#include "proto/cluster.pb.h"
-#include "proto/global.pb.h"
#include "proto/model.pb.h"
#include "utils/updater.h"
#include "utils/param.h"
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 68ae937..4b87da0 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -9,7 +9,6 @@
#include "utils/common.h"
#include "proto/cluster.pb.h"
#include "utils/cluster_rt.h"
-#include "proto/global.pb.h"
using std::shared_ptr;
using std::string;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 1b877ec..55ca243 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -49,6 +49,42 @@ class ClusterRuntime {
virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
};
+const std::string kZKPathSinga = "/singa";
+const std::string kZKPathStatus = "/singa/status";
+const std::string kZKPathRegist = "/singa/regist";
+const std::string kZKPathRegistProc = "/singa/regist/proc";
+const std::string kZKPathRegistLock = "/singa/regist/lock";
+const int kZKBufSize = 50;
+
+struct RTCallback {
+ rt_callback fn;
+ void* ctx;
+};
+
+class ZKService {
+ public:
+ static void ChildChanges(zhandle_t* zh, int type, int state,
+ const char *path, void* watcherCtx);
+ ~ZKService();
+ bool Init(const std::string& host, int timeout);
+ bool CreateNode(const char* path, const char* val, int flag, char* output);
+ bool DeleteNode(const char* path);
+ bool Exist(const char* path);
+ bool GetNode(const char* path, char* output);
+ bool GetChild(const char* path, std::vector<std::string>* vt);
+ bool WGetChild(const char* path, std::vector<std::string>* vt,
+ RTCallback *cb);
+
+ private:
+ const int kNumRetry = 10;
+ const int kSleepSec = 1;
+
+ static void WatcherGlobal(zhandle_t* zh, int type, int state,
+ const char *path, void* watcherCtx);
+
+ zhandle_t* zkhandle_ = nullptr;
+};
+
class ZKClusterRT : public ClusterRuntime {
public:
explicit ZKClusterRT(const std::string& host);
@@ -63,35 +99,33 @@ class ZKClusterRT : public ClusterRuntime {
bool LeaveSGroup(int gid, int wid, int s_group) override;
private:
- struct RTCallback {
- rt_callback fn;
- void* ctx;
- };
+ inline std::string groupPath(int gid) {
+ return kZKPathStatus + "/sg" + std::to_string(gid);
+ }
+ inline std::string workerPath(int gid, int wid) {
+ return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
+ }
+
+ int timeout_ = 30000;
+ std::string host_ = "";
+ ZKService zk_;
+ std::vector<RTCallback*> cb_vec_;
+};
- const int kMaxBufLen = 50;
- const int kNumRetry = 10;
- const int kSleepSec = 1;
- const std::string kZPathSinga = "/singa";
- const std::string kZPathStatus = "/singa/status";
- const std::string kZPathRegist = "/singa/regist";
- const std::string kZPathRegistProc = "/singa/regist/proc";
- const std::string kZPathRegistLock = "/singa/regist/lock";
+class JobManager {
+ public:
+ explicit JobManager(const std::string& host);
+ JobManager(const std::string& host, int timeout);
- static void WatcherGlobal(zhandle_t* zh, int type, int state,
- const char *path, void* watcherCtx);
- static void ChildChanges(zhandle_t* zh, int type, int state,
- const char *path, void* watcherCtx);
- bool CreateZKNode(const char* path, const char* val, int flag, char* output);
- bool DeleteZKNode(const char* path);
- bool GetZKNode(const char* path, char* output);
- bool GetZKChild(const char* path, std::vector<std::string>* vt);
- inline std::string groupPath(int gid);
- std::string workerPath(int gid, int wid);
+ bool Init();
+ bool Clean();
+
+ private:
+ bool CleanPath(const std::string& path);
int timeout_ = 30000;
std::string host_ = "";
- zhandle_t* zkhandle_ = nullptr;
- std::vector<RTCallback*> cb_vec_;
+ ZKService zk_;
};
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/proto/cluster.proto
----------------------------------------------------------------------
diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto
index 54ce300..7afb866 100644
--- a/src/proto/cluster.proto
+++ b/src/proto/cluster.proto
@@ -1,5 +1,12 @@
package singa;
+message GlobalProto {
+ // ip/hostname:port[,ip/hostname:port]
+ required string zookeeper_host = 1;
+ // if not set, use the default dir of glog
+ optional string log_dir = 2 [default = "/tmp/singa-log/"];
+}
+
message ClusterProto {
optional int32 nworker_groups = 1;
optional int32 nserver_groups = 2;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/proto/global.proto
----------------------------------------------------------------------
diff --git a/src/proto/global.proto b/src/proto/global.proto
deleted file mode 100644
index 84eb7be..0000000
--- a/src/proto/global.proto
+++ /dev/null
@@ -1,8 +0,0 @@
-package singa;
-
-message GlobalProto {
- // ip/hostname:port[,ip/hostname:port]
- required string zookeeper_host = 1;
- // if not set, use the default dir of glog
- optional string log_dir = 2 [default = "/tmp/singa-log/"];
-}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 6143567..408adde 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -3,147 +3,12 @@
#include <glog/logging.h>
#include <algorithm>
-using std::to_string;
using std::string;
+using std::to_string;
namespace singa {
-ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {}
-
-ZKClusterRT::ZKClusterRT(const string& host, int timeout) {
- host_ = host;
- timeout_ = timeout;
- zkhandle_ = nullptr;
-}
-
-ZKClusterRT::~ZKClusterRT() {
- // close zookeeper handler
- zookeeper_close(zkhandle_);
- // release callback vector
- for (RTCallback* p : cb_vec_) {
- delete p;
- }
-}
-
-char zk_cxt[] = "ZKClusterRT";
-
-bool ZKClusterRT::Init() {
- zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
- zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0,
- static_cast<void *>(zk_cxt), 0);
- if (zkhandle_ == NULL) {
- LOG(ERROR) << "Error when connecting to zookeeper servers...";
- LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
- LOG(ERROR) << host_.c_str();
- return false;
- }
- // create kZPathSinga
- if (!CreateZKNode(kZPathSinga.c_str(), nullptr, 0, nullptr))
- return false;
- // create kZPathStatus
- if (!CreateZKNode(kZPathStatus.c_str(), nullptr, 0, nullptr))
- return false;
- // create kZPathRegist
- if (!CreateZKNode(kZPathRegist.c_str(), nullptr, 0, nullptr))
- return false;
- // create kZPathRegistProc
- if (!CreateZKNode(kZPathRegistProc.c_str(), nullptr, 0, nullptr))
- return false;
- // create kZPathRegistLock
- if (!CreateZKNode(kZPathRegistLock.c_str(), nullptr, 0, nullptr))
- return false;
-
- return true;
-}
-
-int ZKClusterRT::RegistProc(const string& host_addr) {
- char buf[kMaxBufLen];
- string lock = kZPathRegistLock+"/lock-";
- if (!CreateZKNode(lock.c_str(), nullptr, ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
- return -1;
- }
- // get all children in lock folder
- std::vector<string> vt;
- if (!GetZKChild(kZPathRegistLock.c_str(), &vt)) {
- return -1;
- }
- // find own position among all locks
- int id = -1;
- std::sort(vt.begin(), vt.end());
- for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
- if (kZPathRegistLock+"/"+vt[i] == buf) {
- id = i;
- break;
- }
- }
- if (id == -1) {
- LOG(ERROR) << "cannot find own node " << buf;
- return -1;
- }
- // create a new node in proc path
- string path = kZPathRegistProc+"/proc-"+to_string(id);
- if (!CreateZKNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, nullptr)) {
- return -1;
- }
- return id;
-}
-
-string ZKClusterRT::GetProcHost(int proc_id) {
- // char buf[kMaxBufLen];
- char val[kMaxBufLen];
- // construct file name
- string path = kZPathRegistProc+"/proc-"+to_string(proc_id);
- if (!GetZKNode(path.c_str(), val)) return "";
- return string(val);
-}
-
-bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
- CHECK_NOTNULL(fn);
- string path = groupPath(gid);
- // create zk node
- if (!CreateZKNode(path.c_str(), nullptr, 0, nullptr)) return false;
- struct String_vector child;
- // store the callback function and context for later usage
- RTCallback *cb = new RTCallback;
- cb->fn = fn;
- cb->ctx = ctx;
- cb_vec_.push_back(cb);
- // start to watch on the zk node, does not care about the first return value
- int ret = zoo_wget_children(zkhandle_, path.c_str(), ChildChanges,
- cb, &child);
- if (ret != ZOK) {
- LOG(ERROR) << "failed to get child of " << path;
- return false;
- }
- return true;
-}
-
-bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
- string path = groupPath(s_group) + workerPath(gid, wid);
- // try to create an ephemeral node under server group path
- if (!CreateZKNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr)) {
- return false;
- }
- return true;
-}
-
-bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
- string path = groupPath(s_group) + workerPath(gid, wid);
- if (!DeleteZKNode(path.c_str())) return false;
- return true;
-}
-
-void ZKClusterRT::WatcherGlobal(zhandle_t * zh, int type, int state,
- const char *path, void *watcherCtx) {
- if (type == ZOO_SESSION_EVENT) {
- if (state == ZOO_CONNECTED_STATE)
- LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
- else if (state == ZOO_EXPIRED_SESSION_STATE)
- LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
- }
-}
-
-void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state,
+void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
const char *path, void *watcherCtx) {
// check if already callback
RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
@@ -168,14 +33,35 @@ void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state,
}
}
-bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag,
+ZKService::~ZKService() {
+ // close zookeeper handler
+ zookeeper_close(zkhandle_);
+}
+
+char zk_cxt[] = "ZKClusterRT";
+
+bool ZKService::Init(const string& host, int timeout) {
+ zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
+ zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
+ static_cast<void *>(zk_cxt), 0);
+ if (zkhandle_ == NULL) {
+ LOG(ERROR) << "Error when connecting to zookeeper servers...";
+ LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
+ LOG(ERROR) << host.c_str();
+ return false;
+ }
+
+ return true;
+}
+
+bool ZKService::CreateNode(const char* path, const char* val, int flag,
char* output) {
- char buf[kMaxBufLen];
+ char buf[kZKBufSize];
int ret = 0;
// send the zk request
for (int i = 0; i < kNumRetry; ++i) {
ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
- &ZOO_OPEN_ACL_UNSAFE, flag, buf, kMaxBufLen);
+ &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
if (ret == ZNONODE) {
LOG(WARNING) << "zookeeper parent node of " << path
<< " not exist, retry later";
@@ -202,7 +88,7 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag,
return false;
}
-bool ZKClusterRT::DeleteZKNode(const char* path) {
+bool ZKService::DeleteNode(const char* path) {
int ret = zoo_delete(zkhandle_, path, -1);
if (ret == ZOK) {
LOG(INFO) << "deleted zookeeper node " << path;
@@ -215,9 +101,18 @@ bool ZKClusterRT::DeleteZKNode(const char* path) {
return false;
}
-bool ZKClusterRT::GetZKNode(const char* path, char* output) {
+bool ZKService::Exist(const char* path) {
struct Stat stat;
- int val_len = kMaxBufLen;
+ int ret = zoo_exists(zkhandle_, path, 0, &stat);
+ if (ret == ZOK) return true;
+ else if (ret == ZNONODE) return false;
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
+ return false;
+}
+
+bool ZKService::GetNode(const char* path, char* output) {
+ struct Stat stat;
+ int val_len = kZKBufSize;
int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
if (ret == ZOK) {
output[val_len] = '\0';
@@ -230,24 +125,168 @@ bool ZKClusterRT::GetZKNode(const char* path, char* output) {
return false;
}
-bool ZKClusterRT::GetZKChild(const char* path, std::vector<string>* vt) {
- // get all children in lock folder
+bool ZKService::GetChild(const char* path, std::vector<string>* vt) {
struct String_vector child;
int ret = zoo_get_children(zkhandle_, path, 0, &child);
if (ret == ZOK) {
for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
return true;
}
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
return false;
}
-string ZKClusterRT::groupPath(int gid) {
- return kZPathStatus+"/sg"+to_string(gid);
+bool ZKService::WGetChild(const char* path, std::vector<std::string>* vt,
+ RTCallback *cb) {
+ struct String_vector child;
+ int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
+ if (ret == ZOK) {
+ for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
+ return true;
+ }
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
+ return false;
+}
+
+
+void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
+ const char *path, void *watcherCtx) {
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_CONNECTED_STATE)
+ LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
+ else if (state == ZOO_EXPIRED_SESSION_STATE)
+ LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
+ }
+}
+
+ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {}
+
+ZKClusterRT::ZKClusterRT(const string& host, int timeout) {
+ host_ = host;
+ timeout_ = timeout;
}
-string ZKClusterRT::workerPath(int gid, int wid) {
- return "/g"+to_string(gid)+"_w"+to_string(wid);
+ZKClusterRT::~ZKClusterRT() {
+ // release callback vector
+ for (RTCallback* p : cb_vec_) {
+ delete p;
+ }
+}
+
+bool ZKClusterRT::Init() {
+ if (!zk_.Init(host_, timeout_)) return false;
+ // create kZKPathSinga
+ if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
+ return false;
+ // create kZKPathStatus
+ if (!zk_.CreateNode(kZKPathStatus.c_str(), nullptr, 0, nullptr))
+ return false;
+ // create kZKPathRegist
+ if (!zk_.CreateNode(kZKPathRegist.c_str(), nullptr, 0, nullptr))
+ return false;
+ // create kZKPathRegistProc
+ if (!zk_.CreateNode(kZKPathRegistProc.c_str(), nullptr, 0, nullptr))
+ return false;
+ // create kZKPathRegistLock
+ if (!zk_.CreateNode(kZKPathRegistLock.c_str(), nullptr, 0, nullptr))
+ return false;
+ return true;
+}
+
+int ZKClusterRT::RegistProc(const string& host_addr) {
+ char buf[kZKBufSize];
+ string lock = kZKPathRegistLock+"/lock-";
+ if (!zk_.CreateNode(lock.c_str(), nullptr,
+ ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
+ return -1;
+ }
+ // get all children in lock folder
+ std::vector<string> vt;
+ if (!zk_.GetChild(kZKPathRegistLock.c_str(), &vt)) {
+ return -1;
+ }
+ // find own position among all locks
+ int id = -1;
+ std::sort(vt.begin(), vt.end());
+ for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
+ if (kZKPathRegistLock+"/"+vt[i] == buf) {
+ id = i;
+ break;
+ }
+ }
+ if (id == -1) {
+ LOG(ERROR) << "cannot find own node " << buf;
+ return -1;
+ }
+ // create a new node in proc path
+ string path = kZKPathRegistProc+"/proc-"+to_string(id);
+ if (!zk_.CreateNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL,
+ nullptr)) {
+ return -1;
+ }
+ return id;
+}
+
+bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
+ CHECK_NOTNULL(fn);
+ string path = groupPath(gid);
+ // create zk node
+ if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
+ std::vector<string> child;
+ // store the callback function and context for later usage
+ RTCallback *cb = new RTCallback;
+ cb->fn = fn;
+ cb->ctx = ctx;
+ cb_vec_.push_back(cb);
+ // start to watch on the zk node, does not care about the first return value
+ return zk_.WGetChild(path.c_str(), &child, cb);
+}
+
+string ZKClusterRT::GetProcHost(int proc_id) {
+ // char buf[kZKBufSize];
+ char val[kZKBufSize];
+ // construct file name
+ string path = kZKPathRegistProc+"/proc-"+to_string(proc_id);
+ if (!zk_.GetNode(path.c_str(), val)) return "";
+ return string(val);
+}
+
+bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
+ string path = groupPath(s_group) + workerPath(gid, wid);
+ // try to create an ephemeral node under server group path
+ return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
+}
+
+bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
+ string path = groupPath(s_group) + workerPath(gid, wid);
+ return zk_.DeleteNode(path.c_str());
+}
+
+JobManager::JobManager(const string& host) : JobManager(host, 30000) {}
+
+JobManager::JobManager(const string& host, int timeout) {
+ host_ = host;
+ timeout_ = timeout;
+}
+
+bool JobManager::Init() {
+ return zk_.Init(host_, timeout_);
+}
+
+bool JobManager::Clean() {
+ if (zk_.Exist(kZKPathSinga.c_str())) {
+ return CleanPath(kZKPathSinga.c_str());
+ }
+ return true;
+}
+
+bool JobManager::CleanPath(const std::string& path) {
+ std::vector<string> child;
+ if (!zk_.GetChild(path.c_str(), &child)) return false;
+ for (string c : child) {
+ if (!CleanPath(path + "/" + c)) return false;
+ }
+ return zk_.DeleteNode(path.c_str());
}
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
new file mode 100644
index 0000000..37ce729
--- /dev/null
+++ b/src/utils/tool.cc
@@ -0,0 +1,25 @@
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "proto/cluster.pb.h"
+#include "utils/cluster_rt.h"
+#include "utils/common.h"
+
+DEFINE_string(global, "conf/singa.conf", "Global config file");
+
+int main(int argc, char **argv) {
+ google::InitGoogleLogging(argv[0]);
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ singa::GlobalProto global;
+ singa::ReadProtoFromTextFile(FLAGS_global.c_str(), &global);
+ singa::SetupLog(global.log_dir(), "SingaTool");
+
+ LOG(INFO) << "The global config is \n" << global.DebugString();
+
+ singa::JobManager mng(global.zookeeper_host());
+ int ret = 0;
+ if (!mng.Init()) ret = 1;
+ if (!mng.Clean()) ret = 1;
+ if (ret) LOG(ERROR) << "errors in SingaTool!";
+ return ret;
+}