You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/06/17 10:00:35 UTC

[3/4] incubator-singa git commit: SINGA-16 Runtime Process id Management

SINGA-16 Runtime Process id Management

Add two new feature in ClusterRuntime Class
  -- int RegistProc(const string& host_addr)
  -- string GetProcHost(int proc_id)


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/da6e4dce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/da6e4dce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/da6e4dce

Branch: refs/heads/master
Commit: da6e4dce8ad6be5bd851425e539e350ea9db5173
Parents: 017b042
Author: wang sheng <wa...@gmail.com>
Authored: Wed Jun 17 14:02:59 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Wed Jun 17 14:08:11 2015 +0800

----------------------------------------------------------------------
 include/utils/cluster_rt.h |  37 ++++--
 src/test/test_cluster.cc   |  23 +++-
 src/utils/cluster_rt.cc    | 266 ++++++++++++++++++++++++++--------------
 3 files changed, 224 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 7b5f361..7ed7b68 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -15,7 +15,7 @@ namespace singa {
  * ClusterRuntime is a runtime service that manages dynamic configuration and status
  * of the whole cluster. It mainly provides following services:
  *    1)  Provide running status of each server/worker
- *    1)  Translate process id to (hostname:port)
+ *    2)  Translate process id to (hostname:port)
  */
 
 typedef void (*rt_callback)(void *contest);
@@ -31,6 +31,20 @@ class ClusterRuntime{
   virtual bool Init(){ return false;}
 
   /**
+   * register the process, and get a unique process id
+   *
+   * \return the process id, -1 if failed
+   */
+  virtual int RegistProc(const string& host_addr){ return -1;};
+
+  /**
+   * translate the process id to host address
+   *
+   * \return the host and port, "" if no such proc id
+   */
+  virtual string GetProcHost(int proc_id){ return "";};   
+
+  /**
    * Server: watch all workers in a server group, will be notified when all workers have left 
    */
   virtual bool sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){ return false;}
@@ -53,15 +67,21 @@ class ZKClusterRT : public ClusterRuntime{
   ZKClusterRT(string host, int timeout = 30000);
   ~ZKClusterRT();
   bool Init();
+  int RegistProc(const string& host_addr);
+  string GetProcHost(int proc_id);   
   bool sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx);
   bool wJoinSGroup(int gid, int wid, int s_group);
   bool wLeaveSGroup(int gid, int wid, int s_group);
-  static void watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx);
   
  private:
-  static void childChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
-  string getSGroupPath(int gid);
-  string getWorkerPath(int gid, int wid);
+  static void WatcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx);
+  static void ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
+  bool CreateZKNode(const char* path, const char* val, int flag, char* output);
+  bool DeleteZKNode(const char* path);
+  bool GetZKNode(const char* path, char* output);
+  bool GetZKChild(const char* path, vector<string>& vt);
+  string groupPath(int gid);
+  string workerPath(int gid, int wid);
 
   struct RTCallback{
     rt_callback fn;
@@ -76,8 +96,11 @@ class ZKClusterRT : public ClusterRuntime{
   const int MAX_BUF_LEN = 50;
   const int RETRY_NUM = 10;
   const int SLEEP_SEC = 1;
-  const string ZK_P_SINGA = "/singa";
-  const string ZK_P_STATUS = "/status";
+  const string ZPATH_SINGA = "/singa";
+  const string ZPATH_STATUS = "/singa/status";
+  const string ZPATH_REGIST = "/singa/regist";
+  const string ZPATH_REGIST_PROC = "/singa/regist/proc";
+  const string ZPATH_REGIST_LOCK = "/singa/regist/lock";
 };
 
 } // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/src/test/test_cluster.cc
----------------------------------------------------------------------
diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc
index bb17149..b16d765 100644
--- a/src/test/test_cluster.cc
+++ b/src/test/test_cluster.cc
@@ -5,7 +5,7 @@
 
 using namespace singa;
 
-string folder="src/test/data/";
+//string folder="src/test/data/";
 
 string host="localhost:2181";
 
@@ -13,12 +13,15 @@ void zk_cb(void *contest){
   LOG(INFO) << "zk callback: " << (char *)contest;
 }
 
-TEST(CluserRuntimeTest, ZooKeeper){
+TEST(CluserRuntimeTest, GroupManagement){
   ClusterRuntime* rt = new ZKClusterRT(host);
   ASSERT_EQ(rt->Init(), true);
+  
   ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true);
+  
   ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true);
   ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true);
+  
   ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true);
   ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true);
   
@@ -26,6 +29,22 @@ TEST(CluserRuntimeTest, ZooKeeper){
   delete rt;
 }
 
+TEST(CluserRuntimeTest, ProcessManagement){
+  ClusterRuntime* rt = new ZKClusterRT(host);
+  ASSERT_EQ(rt->Init(), true);
+  
+  ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0);
+  ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1);
+  ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2);
+
+  ASSERT_NE(rt->GetProcHost(0), "");
+  ASSERT_NE(rt->GetProcHost(1), "");
+  ASSERT_NE(rt->GetProcHost(2), "");
+
+  sleep(3);
+  delete rt;
+}
+
 /**
 ClusterProto GenClusterProto(){
   ClusterProto proto;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index af6fbbc..514ea2e 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -1,4 +1,5 @@
 #include "utils/cluster_rt.h"
+#include <algorithm>
 
 using std::to_string;
 
@@ -25,61 +26,89 @@ bool ZKClusterRT::Init(){
 
   zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
 
-  zkhandle_ = zookeeper_init(host_.c_str(), watcherGlobal, timeout_, 0, "ZKClusterRT", 0);
+  zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0, "ZKClusterRT", 0);
 
-  if (zkhandle_ == nullptr){
+  if (zkhandle_ == NULL){
     LOG(ERROR) << "Error when connecting to zookeeper servers...";
-    LOG(ERROR) <<"Please ensure zookeeper service is up in host(s):";
+    LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
     LOG(ERROR) << host_.c_str();
     return false;
   }
 
+  //create ZPATH_SINGA
+  if (!CreateZKNode(ZPATH_SINGA.c_str(), nullptr, 0, nullptr)) return false;
+  //create ZPATH_STATUS
+  if (!CreateZKNode(ZPATH_STATUS.c_str(), nullptr, 0, nullptr)) return false;
+  //create ZPATH_REGIST
+  if (!CreateZKNode(ZPATH_REGIST.c_str(), nullptr, 0, nullptr)) return false;
+  //create ZPATH_REGIST_PROC
+  if (!CreateZKNode(ZPATH_REGIST_PROC.c_str(), nullptr, 0, nullptr)) return false;
+  //create ZPATH_REGIST_LOCK
+  if (!CreateZKNode(ZPATH_REGIST_LOCK.c_str(), nullptr, 0, nullptr)) return false;
+
+  return true;
+}
+
+int ZKClusterRT::RegistProc(const string& host_addr){
+ 
   char buf[MAX_BUF_LEN];
-  //create ZK_P_SINGA
-  string path = ZK_P_SINGA;
-  int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN);
-  if (ret != ZOK && ret != ZNODEEXISTS){
-    LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
-    return false;
+  string lock = ZPATH_REGIST_LOCK+"/lock-";
+
+  if (!CreateZKNode(lock.c_str(), nullptr, ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)){
+    return -1;
   }
-  //create ZK_P_STATUS
-  path += ZK_P_STATUS;
-  ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN);
-  if (ret != ZOK && ret != ZNODEEXISTS){
-    LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
-    return false;
+
+  //get all children in lock folder
+  vector<string> vt;
+  if (!GetZKChild(ZPATH_REGIST_LOCK.c_str(), vt)){
+    return -1;
   }
 
-  return true;
+  //find own position among all locks
+  int id = -1;
+  std::sort(vt.begin(), vt.end());
+  for (int i = 0; i < (int)vt.size(); ++i){
+    if (ZPATH_REGIST_LOCK+"/"+vt[i] == buf){
+      id = i;
+      break;
+    }
+  }
+
+  if (id == -1){
+    LOG(ERROR) << "cannot find own node " << buf;
+    return -1;
+  }
+
+  //create a new node in proc path
+  string path = ZPATH_REGIST_PROC+"/proc-"+to_string(id);
+  if (!CreateZKNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, nullptr)){
+    return -1;
+  }
+
+  return id;
+}
+
+string ZKClusterRT::GetProcHost(int proc_id){
+  
+  //char buf[MAX_BUF_LEN];
+  char val[MAX_BUF_LEN];
+
+  //construct file name
+  string path = ZPATH_REGIST_PROC+"/proc-"+to_string(proc_id);
+
+  if (!GetZKNode(path.c_str(), val)) return "";
+
+  return string(val);
 }
 
 bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){
 
   CHECK_NOTNULL(fn);
 
-  string path = getSGroupPath(gid);
-  struct Stat stat;
+  string path = groupPath(gid);
 
-  //check existance of zk node
-  int ret = zoo_exists(zkhandle_, path.c_str(), 0, &stat);
-  //if have, pass
-  if (ret == ZOK) ;
-  //need to create zk node first
-  else if (ret == ZNONODE){
-    char buf[MAX_BUF_LEN];
-    ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN);
-    if (ret == ZOK){
-      LOG(INFO) << "zookeeper node " << buf << " created";
-    }
-    else{
-      LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
-      return false;
-    }
-  }
-  else{
-    LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
-    return false;
-  }
+  //create zk node
+  if (!CreateZKNode(path.c_str(), nullptr, 0, nullptr)) return false;
 
   struct String_vector child;
   //store the callback function and context for later usage
@@ -88,72 +117,47 @@ bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){
   cb->ctx = ctx;
   cb_vec_.push_back(cb);
   //start to watch on the zk node, does not care about the first return value
-  zoo_wget_children(zkhandle_, path.c_str(), childChanges, cb, &child);
+  int ret = zoo_wget_children(zkhandle_, path.c_str(), ChildChanges, cb, &child);
+
+  if (ret != ZOK){
+    LOG(ERROR) << "failed to get child of " << path;
+    return false;
+  }
 
   return true;
 }
 
 bool ZKClusterRT::wJoinSGroup(int gid, int wid, int s_group){
 
-  string path = getSGroupPath(s_group) + getWorkerPath(gid, wid);
-  char buf[MAX_BUF_LEN];
-
-  //try to create a file under the server group path
-  for (int i = 0; i < RETRY_NUM; ++i){
-    //send the zk request
-    int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, buf, MAX_BUF_LEN);
+  string path = groupPath(s_group) + workerPath(gid, wid);
 
-    if (ret == ZOK){
-      LOG(INFO) << "zookeeper node " << buf << " created";
-      return true;
-    }
-    else if (ret == ZNODEEXISTS){
-      LOG(WARNING) << "zookeeper node " << path << " already exist";
-      return true;
-    }
-    //the parent node is not on, need to wait
-    else if (ret == ZNONODE){
-      LOG(WARNING) << "zookeeper parent node " << getSGroupPath(s_group) << " not exist, retry later";
-      sleep(SLEEP_SEC);
-      continue;
-    }
-
-    LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+  //try to create an ephemeral node under server group path
+  if (!CreateZKNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr)){
     return false;
   }
 
-  LOG(ERROR) << "zookeeper parent node " << getSGroupPath(s_group) << " still not exist after " << RETRY_NUM << " tries";
-  return false;
+  return true;
 }
 
 bool ZKClusterRT::wLeaveSGroup(int gid, int wid, int s_group){
 
-  string path = getSGroupPath(s_group) + getWorkerPath(gid, wid);
-
-  int ret = zoo_delete(zkhandle_, path.c_str(), -1);
-  if (ret == ZOK){
-    LOG(INFO) << "zookeeper node " << path << " deleted";
-    return true;
-  }
-  else if (ret == ZNONODE){
-    LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
-    return true;
-  }
+  string path = groupPath(s_group) + workerPath(gid, wid);
 
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
-  return false;
+  if (!DeleteZKNode(path.c_str())) return false;
+  
+  return true;
 }
 
-void ZKClusterRT::watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx){
+void ZKClusterRT::WatcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx){
   if (type == ZOO_SESSION_EVENT){
     if (state == ZOO_CONNECTED_STATE)
-      LOG(INFO) << "Connected to zookeeper service successfully!";
+      LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper service successfully!";
     else if (state == ZOO_EXPIRED_SESSION_STATE)
-      LOG(INFO) << "zookeeper session expired!";
+      LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
   }
 }
 
-void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){
+void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){
 
   //check if already callback
   RTCallback *cb = (RTCallback *)watcherCtx;
@@ -162,16 +166,10 @@ void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *p
   if (type == ZOO_CHILD_EVENT){
     struct String_vector child;
     //check the child list and put another watcher
-    int ret = zoo_wget_children(zh, path, childChanges, watcherCtx, &child);
-    LOG(INFO) << "ret = " << ret;
+    int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
     if (ret == ZOK){
-      LOG(INFO) << "child.count = " << child.count;
       if (child.count == 0){
-        //LOG(ERROR) << "do call back";
-        //LOG(ERROR) << "type = " << type;
-        //LOG(ERROR) << "state = " << state;
-        //LOG(ERROR) << "path = " << path;
-        
+        LOG(INFO) << "child.count = 0 in path: " << path;
         //all workers leave, we do callback now
         (*cb->fn)(cb->ctx);
         cb->fn = nullptr;
@@ -186,12 +184,94 @@ void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *p
   }
 }
 
-string ZKClusterRT::getSGroupPath(int gid){
-  //return "/singa/status/sg"+to_string(gid);
-  return ZK_P_SINGA+ZK_P_STATUS+"/sg"+to_string(gid);
+bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag, char* output){
+  
+  char buf[MAX_BUF_LEN];
+  int ret;
+
+  //send the zk request
+  for (int i = 0; i < RETRY_NUM; ++i){
+    ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), &ZOO_OPEN_ACL_UNSAFE, flag, buf, MAX_BUF_LEN);
+    if (ret != ZNONODE) break;
+    LOG(WARNING) << "zookeeper parent node of " << path << " not exist, retry later";
+    sleep(SLEEP_SEC);
+  }
+ 
+  //copy the node name ot output
+  if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)){
+    strcpy(output, buf);
+  }
+
+  if (ret == ZOK){
+    LOG(INFO) << "created zookeeper node " << buf << " (" << (val == nullptr ? "NULL" : val) << ")";
+    return true;
+  }
+  else if (ret == ZNODEEXISTS){
+    LOG(WARNING) << "zookeeper node " << path << " already exists";
+    return true;
+  }
+    
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+  return false;
+}
+
+bool ZKClusterRT::DeleteZKNode(const char* path){
+  
+  int ret = zoo_delete(zkhandle_, path, -1);
+  
+  if (ret == ZOK){
+    LOG(INFO) << "deleted zookeeper node " << path;
+    return true;
+  }
+  else if (ret == ZNONODE){
+    LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
+    return true;
+  }
+
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
+  return false;
+}
+
+bool ZKClusterRT::GetZKNode(const char* path, char* output){
+  
+  struct Stat stat;
+  int val_len = MAX_BUF_LEN;
+
+  int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
+
+  if (ret == ZOK){
+    output[val_len] = 0;
+    return true;
+  }
+  else if (ret == ZNONODE){
+    LOG(ERROR) << "zk node " << path << " does not exist"; 
+    return false;
+  }
+  
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get)";
+  return false;
+}
+
+bool ZKClusterRT::GetZKChild(const char* path, vector<string>& vt){
+
+  //get all children in lock folder
+  struct String_vector child;
+  int ret = zoo_get_children(zkhandle_, path, 0, &child);
+
+  if (ret == ZOK){
+    for (int i = 0; i < child.count; ++i) vt.push_back(child.data[i]);
+    return true;
+  }
+  
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+  return false;
+}
+
+string ZKClusterRT::groupPath(int gid){
+  return ZPATH_STATUS+"/sg"+to_string(gid);
 }
 
-string ZKClusterRT::getWorkerPath(int gid, int wid){
+string ZKClusterRT::workerPath(int gid, int wid){
   return "/g"+to_string(gid)+"_w"+to_string(wid);
 }