You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/06/25 15:45:01 UTC
[3/6] incubator-singa git commit: SINGA-8 Implement distributed
Hogwild Have replaced hard-code enpoints with RegistPocs() and GetProcHost()
implemented with the help of zookeeper. TODO slice large Param objects in a
separate branch.
SINGA-8 Implement distributed Hogwild
Have replaced hard-code enpoints with RegistPocs() and GetProcHost() implemented with the help of zookeeper.
TODO slice large Param objects in a separate branch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/51d4c2ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/51d4c2ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/51d4c2ae
Branch: refs/heads/master
Commit: 51d4c2aec4f9ddedeff588a916b25a0041dd8a88
Parents: f437011
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Fri Jun 19 14:35:50 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 11:49:32 2015 +0800
----------------------------------------------------------------------
include/trainer/trainer.h | 2 +-
include/utils/cluster.h | 19 ++++++++++---------
include/utils/cluster_rt.h | 6 +++---
src/communication/socket.cc | 14 +++++++-------
src/main.cc | 2 +-
src/trainer/trainer.cc | 8 +++++---
src/utils/cluster.cc | 29 ++++++++++++++++++++++++++---
7 files changed, 53 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index fbbfd0b..fb716bc 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -89,7 +89,7 @@ class Trainer{
* @param clusterproto
*/
void Start(const ModelProto& modelproto, const ClusterProto& clusterproto,
- int procs_id);
+ const int procs_id);
// TODO add Resume() function to continue training from a previously stopped
// point.
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 0eeb808..c11874e 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -23,7 +23,7 @@ namespace singa {
class Cluster {
public:
static shared_ptr<Cluster> Get();
- static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id);
+ static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id=0);
const int nserver_groups()const{ return cluster_.nserver_groups(); }
const int nworker_groups()const { return cluster_.nworker_groups(); }
@@ -71,17 +71,12 @@ class Cluster {
return nprocs_;
}
- const string endpoint() const {
- return endpoint(procs_id());
- }
+
/**
* @return endpoint of the router of a procs with the specified id
*/
- const string endpoint(int procs_id) const {
- CHECK_LT(procs_id, nprocs_);
- CHECK_GE(procs_id, 0);
- return endpoints_.at(procs_id);
- }
+ const string endpoint(const int procs_id) const;
+
const string workspace() {return cluster_.workspace();}
const string vis_folder(){
return cluster_.workspace()+"/visualization";
@@ -127,6 +122,11 @@ class Cluster {
}
int ProcsIDOf(int group_id, int id, int flag);
+ const string hostname() const {
+ return hostname_;
+ }
+ void Register(const string& endpoint);
+
private:
Cluster(const ClusterProto &cluster, int procs_id) ;
void SetupFolders(const ClusterProto &cluster);
@@ -135,6 +135,7 @@ class Cluster {
private:
int procs_id_;
int nprocs_;
+ string hostname_;
std::vector<std::string> endpoints_;
// cluster config proto
ClusterProto cluster_;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 08bea90..1b877ec 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -10,7 +10,7 @@ namespace singa {
typedef void (*rt_callback)(void *contest);
/**
- * ClusterRuntime is a runtime service that manages dynamic configuration
+ * ClusterRuntime is a runtime service that manages dynamic configuration
* and status of the whole cluster. It mainly provides following services:
* 1) Provide running status of each server/worker
* 2) Translate process id to (hostname:port)
@@ -35,8 +35,8 @@ class ClusterRuntime {
*/
virtual std::string GetProcHost(int proc_id) = 0;
/**
- * Server: watch all workers in a server group,
- * will be notified when all workers have left
+ * Server: watch all workers in a server group,
+ * will be notified when all workers have left
*/
virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0;
/**
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 38c0d79..2039162 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -36,7 +36,6 @@ Dealer::Dealer(int id) : id_(id) {
CHECK_NOTNULL(poller_);
}
-<<<<<<< HEAD
Dealer::~Dealer() {
zsock_destroy(&dealer_);
}
@@ -92,13 +91,14 @@ Router::~Router() {
}
}
-int Router::Bind(const std::string& endpoint) {
- CHECK_GT(endpoint.length(), 0);
- if (endpoint.length()) {
- CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()), 0);
- return 1;
+int Router::Bind(string endpoint){
+ int port=-1;
+ if(endpoint.length()){
+ port=zsock_bind(router_, "%s", endpoint.c_str());
}
- return 0;
+ CHECK_NE(port,-1)<<endpoint;
+ LOG(INFO)<<"bind successfully to "<<endpoint+":"+std::to_string(port);
+ return port;
}
int Router::Send(Msg **msg) {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
index 77898be..851d528 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -19,7 +19,7 @@
* easily, e.g., AddLayer(layer_type, source_layers, meta_data).
*/
-DEFINE_int32(procsID, 0, "Global process ID");
+DEFINE_int32(procsID, -1, "Global process ID");
DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index bdc1416..cd80296 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -25,17 +25,14 @@ void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){
}
void HandleWorkerFinish(void * ctx){
- /*
HandleContext* hctx=static_cast<HandleContext*> (ctx);
Msg* msg=new Msg();
msg->set_src(-1,-1, kRuntime);
msg->set_dst(hctx->group_id, hctx->id, kServer);
msg->set_type(kStop);
hctx->dealer->Send(&msg);
- */
}
-<<<<<<< HEAD
const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num,
const vector<shared_ptr<Param>>& params){
std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
@@ -417,6 +414,11 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
if (interprocs_dealers.find(dst_procs_id)==interprocs_dealers.end()){
auto dealer=make_shared<Dealer>();
interprocs_dealers[dst_procs_id]=dealer;
+ while(cluster->endpoint(dst_procs_id)==""){
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(kCollectSleepTime));
+ LOG(ERROR)<<"waiting for procs "<< dst_procs_id<<" to register";
+ }
dealer->Connect("tcp://"+cluster->endpoint(dst_procs_id));
}
if(bandwidth(amount, start) <=cluster->bandwidth()){
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 035e0c7..d022a64 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -1,5 +1,6 @@
#include <glog/logging.h>
#include <fcntl.h>
+#include <unistd.h>
#include <fstream>
#include "utils/cluster.h"
#include "proto/cluster.pb.h"
@@ -13,12 +14,17 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
procs_id_=procs_id;
cluster_ = cluster;
SetupFolders(cluster);
+ int nprocs;
if(server_worker_separate())
nprocs_=nworker_procs()+nserver_procs();
else
- nprocs_=std::max(nworker_procs(), nserver_procs());
- CHECK_LT(procs_id, nprocs_);
- if(nprocs_>1){
+ nprocs=std::max(nworker_procs(), nserver_procs());
+ CHECK_LT(procs_id, nprocs);
+ if (cluster_.has_nprocs())
+ CHECK_EQ(cluster.nprocs(), nprocs);
+ else
+ cluster_.set_nprocs(nprocs);
+ if(nprocs>1&&procs_id>-1){
std::ifstream ifs(cluster.hostfile(), std::ifstream::in);
std::string line;
while(std::getline(ifs, line)
@@ -49,8 +55,25 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
auto rt=new ZKClusterRT(cluster_.zookeeper_host());
rt->Init();
cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
+
+ char buf[128];
+ gethostname(buf, 128);
+ hostname_=string(buf);
}
+void Cluster::Register(const string& endpoint){
+ procs_id_=cluster_rt_->RegistProc(endpoint);
+ CHECK_GE(procs_id_,0);
+ CHECK_LT(procs_id_,nprocs());
+}
+const string Cluster::endpoint(int procsid) const{
+ CHECK_LT(procsid, nprocs());
+ CHECK_GE(procsid, 0);
+ if(endpoints_.size())
+ return endpoints_.at(procsid);
+ else
+ return cluster_rt_->GetProcHost(procsid);
+}
void Cluster::SetupFolders(const ClusterProto &cluster){
// create visulization folder
mkdir(vis_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);