You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/09/26 15:31:26 UTC
incubator-singa git commit: SINGA-73 Refine the selection of
available hosts
Repository: incubator-singa
Updated Branches:
refs/heads/master 0c6e5c692 -> 4e15c3444
SINGA-73 Refine the selection of available hosts
JobManager read next available host from zookeeper,
and assign new jobs to that one, intead of always the first host
cluster.h / .cc
- add UpdateNode function in ZKService
- add GenerateHostList function in JobManager
- move ExtractCLusterConf from tool.cc to cluster.cc
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4e15c344
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4e15c344
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4e15c344
Branch: refs/heads/master
Commit: 4e15c3444bfb1f7fc3816eda2416007604137ab3
Parents: 0c6e5c6
Author: wang sheng <wa...@gmail.com>
Authored: Fri Sep 25 15:14:58 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Fri Sep 25 15:14:58 2015 +0800
----------------------------------------------------------------------
include/utils/cluster_rt.h | 4 ++
src/utils/cluster_rt.cc | 114 +++++++++++++++++++++++++++++++++++++---
src/utils/tool.cc | 78 +++------------------------
3 files changed, 119 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index c78bf75..bdfa8fd 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -35,6 +35,7 @@ const int kZKBufSize = 100;
const std::string kZKPathSinga = "/singa";
const std::string kZKPathSys = "/singa/sys";
const std::string kZKPathJLock = "/singa/sys/job-lock";
+const std::string kZKPathHostIdx = "/singa/sys/host-idx";
const std::string kZKPathApp = "/singa/app";
const std::string kZKPathJob = "/singa/app/job-";
// following paths are local under /singa/app/job-X
@@ -72,6 +73,7 @@ class ZKService {
bool CreateNode(const char* path, const char* val, int flag, char* output);
bool DeleteNode(const char* path);
bool Exist(const char* path);
+ bool UpdateNode(const char* path, const char* val);
bool GetNode(const char* path, char* output);
bool GetChild(const char* path, std::vector<std::string>* vt);
bool WGetChild(const char* path, std::vector<std::string>* vt,
@@ -154,6 +156,7 @@ class JobManager {
bool Init();
bool GenerateJobID(int* id);
+ bool GenerateHostList(const char* job_file, std::vector<std::string>* list);
bool ListJobs(std::vector<JobInfo>* jobs);
bool ListJobProcs(int job, std::vector<std::string>* procs);
bool Remove(int job);
@@ -164,6 +167,7 @@ class JobManager {
const int kJobsNotRemoved = 10;
bool CleanPath(const std::string& path, bool remove);
+ std::string ExtractClusterConf(const char* job_file);
int timeout_ = 30000;
std::string host_ = "";
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 265e80b..e51ac97 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -22,7 +22,12 @@
#include "utils/cluster_rt.h"
#include <glog/logging.h>
+#include <google/protobuf/text_format.h>
+#include <stdlib.h>
#include <algorithm>
+#include <fstream>
+#include <iostream>
+#include "proto/job.pb.h"
using std::string;
using std::to_string;
@@ -94,9 +99,11 @@ bool ZKService::CreateNode(const char* path, const char* val, int flag,
}
sleep(kSleepSec);
}
- // copy the node name ot output
+ // copy the node name to output
if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
- strcpy(output, buf);
+ snprintf(output, kZKBufSize, "%s", buf);
+ // use snprintf instead of strcpy
+ // strcpy(output, buf);
}
if (ret == ZOK) {
LOG(INFO) << "created zookeeper node " << buf
@@ -141,6 +148,20 @@ bool ZKService::Exist(const char* path) {
return false;
}
+bool ZKService::UpdateNode(const char* path, const char* val) {
+ // set version = -1, do not check content version
+ int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
+ if (ret == ZOK) {
+ return true;
+ } else if (ret == ZNONODE) {
+ LOG(ERROR) << "zk node " << path << " does not exist";
+ return false;
+ }
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_get " << path << ")";
+ return false;
+}
+
bool ZKService::GetNode(const char* path, char* output) {
struct Stat stat;
int val_len = kZKBufSize;
@@ -153,7 +174,7 @@ bool ZKService::GetNode(const char* path, char* output) {
return false;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_get " << path << ")";
+ << " (zoo_get " << path << ")";
return false;
}
@@ -282,12 +303,11 @@ bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
}
std::string ClusterRuntime::GetProcHost(int proc_id) {
- // char buf[kZKBufSize];
char val[kZKBufSize];
// construct file name
- string path = proc_path_+"/proc-"+to_string(proc_id);
+ string path = proc_path_ + "/proc-" + to_string(proc_id);
if (!zk_.GetNode(path.c_str(), val)) return "";
- int len = strlen(val)-1;
+ int len = strlen(val) - 1;
while (len && val[len] != '|') --len;
CHECK(len);
val[len] = '\0';
@@ -320,6 +340,8 @@ bool JobManager::Init() {
return false;
if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
return false;
+ if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr))
+ return false;
if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
return false;
return true;
@@ -332,7 +354,52 @@ bool JobManager::GenerateJobID(int* id) {
ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
return false;
}
- *id = atoi(buf+strlen(buf)-10);
+ *id = atoi(buf + strlen(buf) - 10);
+ return true;
+}
+
+bool JobManager::GenerateHostList(const char* job_file, vector<string>* list) {
+ // compute required #process from job conf
+ ClusterProto cluster;
+ google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file),
+ &cluster);
+ int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
+ / cluster.nworkers_per_procs();
+ int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
+ / cluster.nservers_per_procs();
+ int nprocs = 0;
+ if (cluster.server_worker_separate())
+ nprocs = nworker_procs + nserver_procs;
+ else
+ nprocs = std::max(nworker_procs, nserver_procs);
+ // get available host list from global conf
+ std::ifstream hostfile("conf/hostfile");
+ if (!hostfile.is_open()) {
+ LOG(FATAL) << "Cannot open file: " << "conf/hostfile";
+ }
+ vector<string> hosts;
+ string host;
+ while (!hostfile.eof()) {
+ getline(hostfile, host);
+ if (!host.length() || host[0] == '#') continue;
+ hosts.push_back(host);
+ }
+ if (!hosts.size()) {
+ LOG(FATAL) << "Empty host file";
+ }
+ // read next host index
+ char val[kZKBufSize];
+ if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false;
+ int next = atoi(val);
+ // generate host list
+ list->clear();
+ for (int i = 0; i < nprocs; ++i) {
+ list->push_back(hosts[(next + i) % hosts.size()]);
+ }
+ // write next host index
+ next = (next + nprocs) % hosts.size();
+ snprintf(val, kZKBufSize, "%d", next);
+ if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false;
return true;
}
@@ -426,4 +493,37 @@ bool JobManager::CleanPath(const string& path, bool remove) {
return true;
}
+// extract cluster configuration part from the job config file
+// TODO(wangsh) improve this function to make it robust
+string JobManager::ExtractClusterConf(const char* job_file) {
+ std::ifstream fin(job_file);
+ CHECK(fin.is_open()) << "cannot open job conf file " << job_file;
+ string line;
+ string cluster;
+ bool in_cluster = false;
+ while (!fin.eof()) {
+ std::getline(fin, line);
+ if (in_cluster == false) {
+ size_t pos = line.find("cluster");
+ if (pos == std::string::npos) continue;
+ in_cluster = true;
+ line = line.substr(pos);
+ cluster = "";
+ }
+ if (in_cluster == true) {
+ cluster += line + "\n";
+ if (line.find("}") != std::string::npos)
+ in_cluster = false;
+ }
+ }
+ LOG(INFO) << "cluster configure: " << cluster;
+ size_t s_pos = cluster.find("{");
+ size_t e_pos = cluster.find("}");
+ if (s_pos == std::string::npos || e_pos == std::string::npos) {
+ LOG(FATAL) << "cannot extract valid cluster configuration in file: "
+ << job_file;
+ }
+ return cluster.substr(s_pos + 1, e_pos - s_pos-1);
+}
+
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
index 295dcf0..435129c 100644
--- a/src/utils/tool.cc
+++ b/src/utils/tool.cc
@@ -20,11 +20,9 @@
*************************************************************/
#include <glog/logging.h>
-#include <google/protobuf/text_format.h>
#include <algorithm>
-#include <fstream>
-#include <iostream>
-#include "proto/job.pb.h"
+#include <string>
+#include <vector>
#include "proto/singa.pb.h"
#include "utils/cluster_rt.h"
#include "utils/common.h"
@@ -52,75 +50,15 @@ int create() {
return SUCCESS;
}
-// extract cluster configuration part from the job config file
-// TODO(wangsh) improve this function to make it robust
-const std::string extract_cluster(const char* jobfile) {
- std::ifstream fin;
- fin.open(jobfile, std::ifstream::in);
- CHECK(fin.is_open()) << "cannot open job conf file " << jobfile;
- std::string line;
- std::string cluster;
- bool in_cluster = false;
- while (std::getline(fin, line)) {
- if (in_cluster == false) {
- size_t pos = line.find("cluster");
- if (pos == std::string::npos) continue;
- in_cluster = true;
- line = line.substr(pos);
- cluster = "";
- }
- if (in_cluster == true) {
- cluster += line + "\n";
- if (line.find("}") != std::string::npos)
- in_cluster = false;
- }
- }
- LOG(INFO) << "cluster configure: " << cluster;
- size_t s_pos = cluster.find("{");
- size_t e_pos = cluster.find("}");
- if (s_pos == std::string::npos || e_pos == std::string::npos) {
- LOG(FATAL) << "cannot extract valid cluster configuration in file: "
- << jobfile;
- }
- return cluster.substr(s_pos+1, e_pos-s_pos-1);
-}
-
// generate a host list
int genhost(char* job_conf) {
- // compute required #process from job conf
- singa::ClusterProto cluster;
- google::protobuf::TextFormat::ParseFromString(extract_cluster(job_conf),
- &cluster);
- int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
- / cluster.nworkers_per_procs();
- int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
- / cluster.nservers_per_procs();
- int nprocs = 0;
- if (cluster.server_worker_separate())
- nprocs = nworker_procs + nserver_procs;
- else
- nprocs = std::max(nworker_procs, nserver_procs);
-
- // get available host list from global conf
- std::fstream hostfile("conf/hostfile");
- if (!hostfile.is_open()) {
- LOG(ERROR) << "Cannot open file: " << "conf/hostfile";
- return RUN_ERR;
- }
- std::vector<std::string> hosts;
- std::string host;
- while (!hostfile.eof()) {
- getline(hostfile, host);
- if (!host.length() || host[0] == '#') continue;
- hosts.push_back(host);
- }
- if (!hosts.size()) {
- LOG(ERROR) << "Empty host file";
- return RUN_ERR;
- }
+ singa::JobManager mngr(global.zookeeper_host());
+ if (!mngr.Init()) return RUN_ERR;
+ std::vector<std::string> list;
+ if (!mngr.GenerateHostList(job_conf, &list)) return RUN_ERR;
// output selected hosts
- for (int i = 0; i < nprocs; ++i)
- printf("%s\n", hosts[i % hosts.size()].c_str());
+ for (std::string host : list)
+ printf("%s\n", host.c_str());
return SUCCESS;
}