You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/06/17 10:00:35 UTC
[3/4] incubator-singa git commit: SINGA-16 Runtime Process id
Management
SINGA-16 Runtime Process id Management
Add two new feature in ClusterRuntime Class
-- int RegistProc(const string& host_addr)
-- string GetProcHost(int proc_id)
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/da6e4dce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/da6e4dce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/da6e4dce
Branch: refs/heads/master
Commit: da6e4dce8ad6be5bd851425e539e350ea9db5173
Parents: 017b042
Author: wang sheng <wa...@gmail.com>
Authored: Wed Jun 17 14:02:59 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Wed Jun 17 14:08:11 2015 +0800
----------------------------------------------------------------------
include/utils/cluster_rt.h | 37 ++++--
src/test/test_cluster.cc | 23 +++-
src/utils/cluster_rt.cc | 266 ++++++++++++++++++++++++++--------------
3 files changed, 224 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 7b5f361..7ed7b68 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -15,7 +15,7 @@ namespace singa {
* ClusterRuntime is a runtime service that manages dynamic configuration and status
* of the whole cluster. It mainly provides following services:
* 1) Provide running status of each server/worker
- * 1) Translate process id to (hostname:port)
+ * 2) Translate process id to (hostname:port)
*/
typedef void (*rt_callback)(void *contest);
@@ -31,6 +31,20 @@ class ClusterRuntime{
virtual bool Init(){ return false;}
/**
+ * register the process, and get a unique process id
+ *
+ * \return the process id, -1 if failed
+ */
+ virtual int RegistProc(const string& host_addr){ return -1;};
+
+ /**
+ * translate the process id to host address
+ *
+ * \return the host and port, "" if no such proc id
+ */
+ virtual string GetProcHost(int proc_id){ return "";};
+
+ /**
* Server: watch all workers in a server group, will be notified when all workers have left
*/
virtual bool sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){ return false;}
@@ -53,15 +67,21 @@ class ZKClusterRT : public ClusterRuntime{
ZKClusterRT(string host, int timeout = 30000);
~ZKClusterRT();
bool Init();
+ int RegistProc(const string& host_addr);
+ string GetProcHost(int proc_id);
bool sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx);
bool wJoinSGroup(int gid, int wid, int s_group);
bool wLeaveSGroup(int gid, int wid, int s_group);
- static void watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx);
private:
- static void childChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
- string getSGroupPath(int gid);
- string getWorkerPath(int gid, int wid);
+ static void WatcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx);
+ static void ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
+ bool CreateZKNode(const char* path, const char* val, int flag, char* output);
+ bool DeleteZKNode(const char* path);
+ bool GetZKNode(const char* path, char* output);
+ bool GetZKChild(const char* path, vector<string>& vt);
+ string groupPath(int gid);
+ string workerPath(int gid, int wid);
struct RTCallback{
rt_callback fn;
@@ -76,8 +96,11 @@ class ZKClusterRT : public ClusterRuntime{
const int MAX_BUF_LEN = 50;
const int RETRY_NUM = 10;
const int SLEEP_SEC = 1;
- const string ZK_P_SINGA = "/singa";
- const string ZK_P_STATUS = "/status";
+ const string ZPATH_SINGA = "/singa";
+ const string ZPATH_STATUS = "/singa/status";
+ const string ZPATH_REGIST = "/singa/regist";
+ const string ZPATH_REGIST_PROC = "/singa/regist/proc";
+ const string ZPATH_REGIST_LOCK = "/singa/regist/lock";
};
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/src/test/test_cluster.cc
----------------------------------------------------------------------
diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc
index bb17149..b16d765 100644
--- a/src/test/test_cluster.cc
+++ b/src/test/test_cluster.cc
@@ -5,7 +5,7 @@
using namespace singa;
-string folder="src/test/data/";
+//string folder="src/test/data/";
string host="localhost:2181";
@@ -13,12 +13,15 @@ void zk_cb(void *contest){
LOG(INFO) << "zk callback: " << (char *)contest;
}
-TEST(CluserRuntimeTest, ZooKeeper){
+TEST(CluserRuntimeTest, GroupManagement){
ClusterRuntime* rt = new ZKClusterRT(host);
ASSERT_EQ(rt->Init(), true);
+
ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true);
+
ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true);
ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true);
+
ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true);
ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true);
@@ -26,6 +29,22 @@ TEST(CluserRuntimeTest, ZooKeeper){
delete rt;
}
+TEST(CluserRuntimeTest, ProcessManagement){
+ ClusterRuntime* rt = new ZKClusterRT(host);
+ ASSERT_EQ(rt->Init(), true);
+
+ ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0);
+ ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1);
+ ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2);
+
+ ASSERT_NE(rt->GetProcHost(0), "");
+ ASSERT_NE(rt->GetProcHost(1), "");
+ ASSERT_NE(rt->GetProcHost(2), "");
+
+ sleep(3);
+ delete rt;
+}
+
/**
ClusterProto GenClusterProto(){
ClusterProto proto;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index af6fbbc..514ea2e 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -1,4 +1,5 @@
#include "utils/cluster_rt.h"
+#include <algorithm>
using std::to_string;
@@ -25,61 +26,89 @@ bool ZKClusterRT::Init(){
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zkhandle_ = zookeeper_init(host_.c_str(), watcherGlobal, timeout_, 0, "ZKClusterRT", 0);
+ zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0, "ZKClusterRT", 0);
- if (zkhandle_ == nullptr){
+ if (zkhandle_ == NULL){
LOG(ERROR) << "Error when connecting to zookeeper servers...";
- LOG(ERROR) <<"Please ensure zookeeper service is up in host(s):";
+ LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
LOG(ERROR) << host_.c_str();
return false;
}
+ //create ZPATH_SINGA
+ if (!CreateZKNode(ZPATH_SINGA.c_str(), nullptr, 0, nullptr)) return false;
+ //create ZPATH_STATUS
+ if (!CreateZKNode(ZPATH_STATUS.c_str(), nullptr, 0, nullptr)) return false;
+ //create ZPATH_REGIST
+ if (!CreateZKNode(ZPATH_REGIST.c_str(), nullptr, 0, nullptr)) return false;
+ //create ZPATH_REGIST_PROC
+ if (!CreateZKNode(ZPATH_REGIST_PROC.c_str(), nullptr, 0, nullptr)) return false;
+ //create ZPATH_REGIST_LOCK
+ if (!CreateZKNode(ZPATH_REGIST_LOCK.c_str(), nullptr, 0, nullptr)) return false;
+
+ return true;
+}
+
+int ZKClusterRT::RegistProc(const string& host_addr){
+
char buf[MAX_BUF_LEN];
- //create ZK_P_SINGA
- string path = ZK_P_SINGA;
- int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN);
- if (ret != ZOK && ret != ZNODEEXISTS){
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
- return false;
+ string lock = ZPATH_REGIST_LOCK+"/lock-";
+
+ if (!CreateZKNode(lock.c_str(), nullptr, ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)){
+ return -1;
}
- //create ZK_P_STATUS
- path += ZK_P_STATUS;
- ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN);
- if (ret != ZOK && ret != ZNODEEXISTS){
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
- return false;
+
+ //get all children in lock folder
+ vector<string> vt;
+ if (!GetZKChild(ZPATH_REGIST_LOCK.c_str(), vt)){
+ return -1;
}
- return true;
+ //find own position among all locks
+ int id = -1;
+ std::sort(vt.begin(), vt.end());
+ for (int i = 0; i < (int)vt.size(); ++i){
+ if (ZPATH_REGIST_LOCK+"/"+vt[i] == buf){
+ id = i;
+ break;
+ }
+ }
+
+ if (id == -1){
+ LOG(ERROR) << "cannot find own node " << buf;
+ return -1;
+ }
+
+ //create a new node in proc path
+ string path = ZPATH_REGIST_PROC+"/proc-"+to_string(id);
+ if (!CreateZKNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, nullptr)){
+ return -1;
+ }
+
+ return id;
+}
+
+string ZKClusterRT::GetProcHost(int proc_id){
+
+ //char buf[MAX_BUF_LEN];
+ char val[MAX_BUF_LEN];
+
+ //construct file name
+ string path = ZPATH_REGIST_PROC+"/proc-"+to_string(proc_id);
+
+ if (!GetZKNode(path.c_str(), val)) return "";
+
+ return string(val);
}
bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){
CHECK_NOTNULL(fn);
- string path = getSGroupPath(gid);
- struct Stat stat;
+ string path = groupPath(gid);
- //check existance of zk node
- int ret = zoo_exists(zkhandle_, path.c_str(), 0, &stat);
- //if have, pass
- if (ret == ZOK) ;
- //need to create zk node first
- else if (ret == ZNONODE){
- char buf[MAX_BUF_LEN];
- ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN);
- if (ret == ZOK){
- LOG(INFO) << "zookeeper node " << buf << " created";
- }
- else{
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
- return false;
- }
- }
- else{
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
- return false;
- }
+ //create zk node
+ if (!CreateZKNode(path.c_str(), nullptr, 0, nullptr)) return false;
struct String_vector child;
//store the callback function and context for later usage
@@ -88,72 +117,47 @@ bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){
cb->ctx = ctx;
cb_vec_.push_back(cb);
//start to watch on the zk node, does not care about the first return value
- zoo_wget_children(zkhandle_, path.c_str(), childChanges, cb, &child);
+ int ret = zoo_wget_children(zkhandle_, path.c_str(), ChildChanges, cb, &child);
+
+ if (ret != ZOK){
+ LOG(ERROR) << "failed to get child of " << path;
+ return false;
+ }
return true;
}
bool ZKClusterRT::wJoinSGroup(int gid, int wid, int s_group){
- string path = getSGroupPath(s_group) + getWorkerPath(gid, wid);
- char buf[MAX_BUF_LEN];
-
- //try to create a file under the server group path
- for (int i = 0; i < RETRY_NUM; ++i){
- //send the zk request
- int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, buf, MAX_BUF_LEN);
+ string path = groupPath(s_group) + workerPath(gid, wid);
- if (ret == ZOK){
- LOG(INFO) << "zookeeper node " << buf << " created";
- return true;
- }
- else if (ret == ZNODEEXISTS){
- LOG(WARNING) << "zookeeper node " << path << " already exist";
- return true;
- }
- //the parent node is not on, need to wait
- else if (ret == ZNONODE){
- LOG(WARNING) << "zookeeper parent node " << getSGroupPath(s_group) << " not exist, retry later";
- sleep(SLEEP_SEC);
- continue;
- }
-
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+ //try to create an ephemeral node under server group path
+ if (!CreateZKNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr)){
return false;
}
- LOG(ERROR) << "zookeeper parent node " << getSGroupPath(s_group) << " still not exist after " << RETRY_NUM << " tries";
- return false;
+ return true;
}
bool ZKClusterRT::wLeaveSGroup(int gid, int wid, int s_group){
- string path = getSGroupPath(s_group) + getWorkerPath(gid, wid);
-
- int ret = zoo_delete(zkhandle_, path.c_str(), -1);
- if (ret == ZOK){
- LOG(INFO) << "zookeeper node " << path << " deleted";
- return true;
- }
- else if (ret == ZNONODE){
- LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
- return true;
- }
+ string path = groupPath(s_group) + workerPath(gid, wid);
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
- return false;
+ if (!DeleteZKNode(path.c_str())) return false;
+
+ return true;
}
-void ZKClusterRT::watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx){
+void ZKClusterRT::WatcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx){
if (type == ZOO_SESSION_EVENT){
if (state == ZOO_CONNECTED_STATE)
- LOG(INFO) << "Connected to zookeeper service successfully!";
+ LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper service successfully!";
else if (state == ZOO_EXPIRED_SESSION_STATE)
- LOG(INFO) << "zookeeper session expired!";
+ LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
}
}
-void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){
+void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){
//check if already callback
RTCallback *cb = (RTCallback *)watcherCtx;
@@ -162,16 +166,10 @@ void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *p
if (type == ZOO_CHILD_EVENT){
struct String_vector child;
//check the child list and put another watcher
- int ret = zoo_wget_children(zh, path, childChanges, watcherCtx, &child);
- LOG(INFO) << "ret = " << ret;
+ int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
if (ret == ZOK){
- LOG(INFO) << "child.count = " << child.count;
if (child.count == 0){
- //LOG(ERROR) << "do call back";
- //LOG(ERROR) << "type = " << type;
- //LOG(ERROR) << "state = " << state;
- //LOG(ERROR) << "path = " << path;
-
+ LOG(INFO) << "child.count = 0 in path: " << path;
//all workers leave, we do callback now
(*cb->fn)(cb->ctx);
cb->fn = nullptr;
@@ -186,12 +184,94 @@ void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *p
}
}
-string ZKClusterRT::getSGroupPath(int gid){
- //return "/singa/status/sg"+to_string(gid);
- return ZK_P_SINGA+ZK_P_STATUS+"/sg"+to_string(gid);
+bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag, char* output){
+
+ char buf[MAX_BUF_LEN];
+ int ret;
+
+ //send the zk request
+ for (int i = 0; i < RETRY_NUM; ++i){
+ ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), &ZOO_OPEN_ACL_UNSAFE, flag, buf, MAX_BUF_LEN);
+ if (ret != ZNONODE) break;
+ LOG(WARNING) << "zookeeper parent node of " << path << " not exist, retry later";
+ sleep(SLEEP_SEC);
+ }
+
+ //copy the node name ot output
+ if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)){
+ strcpy(output, buf);
+ }
+
+ if (ret == ZOK){
+ LOG(INFO) << "created zookeeper node " << buf << " (" << (val == nullptr ? "NULL" : val) << ")";
+ return true;
+ }
+ else if (ret == ZNODEEXISTS){
+ LOG(WARNING) << "zookeeper node " << path << " already exists";
+ return true;
+ }
+
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+ return false;
+}
+
+bool ZKClusterRT::DeleteZKNode(const char* path){
+
+ int ret = zoo_delete(zkhandle_, path, -1);
+
+ if (ret == ZOK){
+ LOG(INFO) << "deleted zookeeper node " << path;
+ return true;
+ }
+ else if (ret == ZNONODE){
+ LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
+ return true;
+ }
+
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
+ return false;
+}
+
+bool ZKClusterRT::GetZKNode(const char* path, char* output){
+
+ struct Stat stat;
+ int val_len = MAX_BUF_LEN;
+
+ int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
+
+ if (ret == ZOK){
+ output[val_len] = 0;
+ return true;
+ }
+ else if (ret == ZNONODE){
+ LOG(ERROR) << "zk node " << path << " does not exist";
+ return false;
+ }
+
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get)";
+ return false;
+}
+
+bool ZKClusterRT::GetZKChild(const char* path, vector<string>& vt){
+
+ //get all children in lock folder
+ struct String_vector child;
+ int ret = zoo_get_children(zkhandle_, path, 0, &child);
+
+ if (ret == ZOK){
+ for (int i = 0; i < child.count; ++i) vt.push_back(child.data[i]);
+ return true;
+ }
+
+ LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+ return false;
+}
+
+string ZKClusterRT::groupPath(int gid){
+ return ZPATH_STATUS+"/sg"+to_string(gid);
}
-string ZKClusterRT::getWorkerPath(int gid, int wid){
+string ZKClusterRT::workerPath(int gid, int wid){
return "/g"+to_string(gid)+"_w"+to_string(wid);
}