You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/09/26 15:31:26 UTC

incubator-singa git commit: SINGA-73 Refine the selection of available hosts

Repository: incubator-singa
Updated Branches:
  refs/heads/master 0c6e5c692 -> 4e15c3444


SINGA-73 Refine the selection of available hosts

JobManager read next available host from zookeeper,
and assign new jobs to that one, intead of always the first host

cluster.h / .cc
 - add UpdateNode function in ZKService
 - add GenerateHostList function in JobManager
 - move ExtractCLusterConf from tool.cc to cluster.cc


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4e15c344
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4e15c344
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4e15c344

Branch: refs/heads/master
Commit: 4e15c3444bfb1f7fc3816eda2416007604137ab3
Parents: 0c6e5c6
Author: wang sheng <wa...@gmail.com>
Authored: Fri Sep 25 15:14:58 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Fri Sep 25 15:14:58 2015 +0800

----------------------------------------------------------------------
 include/utils/cluster_rt.h |   4 ++
 src/utils/cluster_rt.cc    | 114 +++++++++++++++++++++++++++++++++++++---
 src/utils/tool.cc          |  78 +++------------------------
 3 files changed, 119 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index c78bf75..bdfa8fd 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -35,6 +35,7 @@ const int kZKBufSize = 100;
 const std::string kZKPathSinga = "/singa";
 const std::string kZKPathSys =   "/singa/sys";
 const std::string kZKPathJLock = "/singa/sys/job-lock";
+const std::string kZKPathHostIdx = "/singa/sys/host-idx";
 const std::string kZKPathApp =   "/singa/app";
 const std::string kZKPathJob =   "/singa/app/job-";
 // following paths are local under /singa/app/job-X
@@ -72,6 +73,7 @@ class ZKService {
   bool CreateNode(const char* path, const char* val, int flag, char* output);
   bool DeleteNode(const char* path);
   bool Exist(const char* path);
+  bool UpdateNode(const char* path, const char* val);
   bool GetNode(const char* path, char* output);
   bool GetChild(const char* path, std::vector<std::string>* vt);
   bool WGetChild(const char* path, std::vector<std::string>* vt,
@@ -154,6 +156,7 @@ class JobManager {
 
   bool Init();
   bool GenerateJobID(int* id);
+  bool GenerateHostList(const char* job_file, std::vector<std::string>* list);
   bool ListJobs(std::vector<JobInfo>* jobs);
   bool ListJobProcs(int job, std::vector<std::string>* procs);
   bool Remove(int job);
@@ -164,6 +167,7 @@ class JobManager {
   const int kJobsNotRemoved = 10;
 
   bool CleanPath(const std::string& path, bool remove);
+  std::string ExtractClusterConf(const char* job_file);
 
   int timeout_ = 30000;
   std::string host_ = "";

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 265e80b..e51ac97 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -22,7 +22,12 @@
 #include "utils/cluster_rt.h"
 
 #include <glog/logging.h>
+#include <google/protobuf/text_format.h>
+#include <stdlib.h>
 #include <algorithm>
+#include <fstream>
+#include <iostream>
+#include "proto/job.pb.h"
 
 using std::string;
 using std::to_string;
@@ -94,9 +99,11 @@ bool ZKService::CreateNode(const char* path, const char* val, int flag,
     }
     sleep(kSleepSec);
   }
-  // copy the node name ot output
+  // copy the node name to output
   if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
-    strcpy(output, buf);
+    snprintf(output, kZKBufSize, "%s", buf);
+    // use snprintf instead of strcpy
+    // strcpy(output, buf);
   }
   if (ret == ZOK) {
     LOG(INFO) << "created zookeeper node " << buf
@@ -141,6 +148,20 @@ bool ZKService::Exist(const char* path) {
   return false;
 }
 
+bool ZKService::UpdateNode(const char* path, const char* val) {
+  // set version = -1, do not check content version
+  int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
+  if (ret == ZOK) {
+    return true;
+  } else if (ret == ZNONODE) {
+    LOG(ERROR) << "zk node " << path << " does not exist";
+    return false;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get " << path << ")";
+  return false;
+}
+
 bool ZKService::GetNode(const char* path, char* output) {
   struct Stat stat;
   int val_len = kZKBufSize;
@@ -153,7 +174,7 @@ bool ZKService::GetNode(const char* path, char* output) {
     return false;
   }
   LOG(FATAL) << "Unhandled ZK error code: " << ret
-            << " (zoo_get " << path << ")";
+             << " (zoo_get " << path << ")";
   return false;
 }
 
@@ -282,12 +303,11 @@ bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
 }
 
 std::string ClusterRuntime::GetProcHost(int proc_id) {
-  // char buf[kZKBufSize];
   char val[kZKBufSize];
   // construct file name
-  string path = proc_path_+"/proc-"+to_string(proc_id);
+  string path = proc_path_ + "/proc-" + to_string(proc_id);
   if (!zk_.GetNode(path.c_str(), val)) return "";
-  int len = strlen(val)-1;
+  int len = strlen(val) - 1;
   while (len && val[len] != '|') --len;
   CHECK(len);
   val[len] = '\0';
@@ -320,6 +340,8 @@ bool JobManager::Init() {
     return false;
   if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
     return false;
+  if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr))
+    return false;
   if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
     return false;
   return true;
@@ -332,7 +354,52 @@ bool JobManager::GenerateJobID(int* id) {
                         ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
     return false;
   }
-  *id = atoi(buf+strlen(buf)-10);
+  *id = atoi(buf + strlen(buf) - 10);
+  return true;
+}
+
+bool JobManager::GenerateHostList(const char* job_file, vector<string>* list) {
+  // compute required #process from job conf
+  ClusterProto cluster;
+  google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file),
+                                                &cluster);
+  int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
+                      / cluster.nworkers_per_procs();
+  int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
+                      / cluster.nservers_per_procs();
+  int nprocs = 0;
+  if (cluster.server_worker_separate())
+    nprocs = nworker_procs + nserver_procs;
+  else
+    nprocs = std::max(nworker_procs, nserver_procs);
+  // get available host list from global conf
+  std::ifstream hostfile("conf/hostfile");
+  if (!hostfile.is_open()) {
+    LOG(FATAL) << "Cannot open file: " << "conf/hostfile";
+  }
+  vector<string> hosts;
+  string host;
+  while (!hostfile.eof()) {
+    getline(hostfile, host);
+    if (!host.length() || host[0] == '#') continue;
+    hosts.push_back(host);
+  }
+  if (!hosts.size()) {
+    LOG(FATAL) << "Empty host file";
+  }
+  // read next host index
+  char val[kZKBufSize];
+  if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false;
+  int next = atoi(val);
+  // generate host list
+  list->clear();
+  for (int i = 0; i < nprocs; ++i) {
+    list->push_back(hosts[(next + i) % hosts.size()]);
+  }
+  // write next host index
+  next = (next + nprocs) % hosts.size();
+  snprintf(val, kZKBufSize, "%d", next);
+  if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false;
   return true;
 }
 
@@ -426,4 +493,37 @@ bool JobManager::CleanPath(const string& path, bool remove) {
   return true;
 }
 
+// extract cluster configuration part from the job config file
+// TODO(wangsh) improve this function to make it robust
+string JobManager::ExtractClusterConf(const char* job_file) {
+  std::ifstream fin(job_file);
+  CHECK(fin.is_open()) << "cannot open job conf file " << job_file;
+  string line;
+  string cluster;
+  bool in_cluster = false;
+  while (!fin.eof()) {
+    std::getline(fin, line);
+    if (in_cluster == false) {
+      size_t pos = line.find("cluster");
+      if (pos == std::string::npos) continue;
+      in_cluster = true;
+      line = line.substr(pos);
+      cluster = "";
+    }
+    if (in_cluster == true) {
+      cluster += line + "\n";
+      if (line.find("}") != std::string::npos)
+        in_cluster = false;
+    }
+  }
+  LOG(INFO) << "cluster configure: " << cluster;
+  size_t s_pos = cluster.find("{");
+  size_t e_pos = cluster.find("}");
+  if (s_pos == std::string::npos || e_pos == std::string::npos) {
+    LOG(FATAL) << "cannot extract valid cluster configuration in file: "
+               << job_file;
+  }
+  return cluster.substr(s_pos + 1, e_pos - s_pos-1);
+}
+
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
index 295dcf0..435129c 100644
--- a/src/utils/tool.cc
+++ b/src/utils/tool.cc
@@ -20,11 +20,9 @@
 *************************************************************/
 
 #include <glog/logging.h>
-#include <google/protobuf/text_format.h>
 #include <algorithm>
-#include <fstream>
-#include <iostream>
-#include "proto/job.pb.h"
+#include <string>
+#include <vector>
 #include "proto/singa.pb.h"
 #include "utils/cluster_rt.h"
 #include "utils/common.h"
@@ -52,75 +50,15 @@ int create() {
   return SUCCESS;
 }
 
-// extract cluster configuration part from the job config file
-// TODO(wangsh) improve this function to make it robust
-const std::string extract_cluster(const char* jobfile) {
-  std::ifstream fin;
-  fin.open(jobfile, std::ifstream::in);
-  CHECK(fin.is_open()) << "cannot open job conf file " << jobfile;
-  std::string line;
-  std::string cluster;
-  bool in_cluster = false;
-  while (std::getline(fin, line)) {
-    if (in_cluster == false) {
-      size_t pos = line.find("cluster");
-      if (pos == std::string::npos) continue;
-      in_cluster = true;
-      line = line.substr(pos);
-      cluster = "";
-    }
-    if (in_cluster == true) {
-      cluster += line + "\n";
-      if (line.find("}") != std::string::npos)
-        in_cluster = false;
-    }
-  }
-  LOG(INFO) << "cluster configure: " << cluster;
-  size_t s_pos = cluster.find("{");
-  size_t e_pos = cluster.find("}");
-  if (s_pos == std::string::npos || e_pos == std::string::npos) {
-    LOG(FATAL) << "cannot extract valid cluster configuration in file: "
-               << jobfile;
-  }
-  return cluster.substr(s_pos+1, e_pos-s_pos-1);
-}
-
 // generate a host list
 int genhost(char* job_conf) {
-  // compute required #process from job conf
-  singa::ClusterProto cluster;
-  google::protobuf::TextFormat::ParseFromString(extract_cluster(job_conf),
-      &cluster);
-  int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
-                      / cluster.nworkers_per_procs();
-  int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
-                      / cluster.nservers_per_procs();
-  int nprocs = 0;
-  if (cluster.server_worker_separate())
-    nprocs = nworker_procs + nserver_procs;
-  else
-    nprocs = std::max(nworker_procs, nserver_procs);
-
-  // get available host list from global conf
-  std::fstream hostfile("conf/hostfile");
-  if (!hostfile.is_open()) {
-    LOG(ERROR) << "Cannot open file: " << "conf/hostfile";
-    return RUN_ERR;
-  }
-  std::vector<std::string> hosts;
-  std::string host;
-  while (!hostfile.eof()) {
-    getline(hostfile, host);
-    if (!host.length() || host[0] == '#') continue;
-    hosts.push_back(host);
-  }
-  if (!hosts.size()) {
-    LOG(ERROR) << "Empty host file";
-    return RUN_ERR;
-  }
+  singa::JobManager mngr(global.zookeeper_host());
+  if (!mngr.Init()) return RUN_ERR;
+  std::vector<std::string> list;
+  if (!mngr.GenerateHostList(job_conf, &list)) return RUN_ERR;
   // output selected hosts
-  for (int i = 0; i < nprocs; ++i)
-    printf("%s\n", hosts[i % hosts.size()].c_str());
+  for (std::string host : list)
+    printf("%s\n", host.c_str());
   return SUCCESS;
 }