You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/06/25 15:45:01 UTC

[3/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild Have replaced hard-code enpoints with RegistPocs() and GetProcHost() implemented with the help of zookeeper. TODO slice large Param objects in a separate branch.

SINGA-8 Implement distributed Hogwild
Have replaced hard-code enpoints with RegistPocs() and GetProcHost() implemented with the help of zookeeper.
TODO slice large Param objects in a separate branch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/51d4c2ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/51d4c2ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/51d4c2ae

Branch: refs/heads/master
Commit: 51d4c2aec4f9ddedeff588a916b25a0041dd8a88
Parents: f437011
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Fri Jun 19 14:35:50 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 11:49:32 2015 +0800

----------------------------------------------------------------------
 include/trainer/trainer.h   |  2 +-
 include/utils/cluster.h     | 19 ++++++++++---------
 include/utils/cluster_rt.h  |  6 +++---
 src/communication/socket.cc | 14 +++++++-------
 src/main.cc                 |  2 +-
 src/trainer/trainer.cc      |  8 +++++---
 src/utils/cluster.cc        | 29 ++++++++++++++++++++++++++---
 7 files changed, 53 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index fbbfd0b..fb716bc 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -89,7 +89,7 @@ class Trainer{
    * @param clusterproto
    */
   void Start(const ModelProto& modelproto, const ClusterProto& clusterproto,
-    int procs_id);
+    const int procs_id);
 
   // TODO add Resume() function to continue training from a previously stopped
   // point.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 0eeb808..c11874e 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -23,7 +23,7 @@ namespace singa {
 class Cluster {
  public:
   static shared_ptr<Cluster> Get();
-  static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id);
+  static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id=0);
 
   const int nserver_groups()const{ return cluster_.nserver_groups(); }
   const int nworker_groups()const { return cluster_.nworker_groups(); }
@@ -71,17 +71,12 @@ class Cluster {
     return nprocs_;
   }
 
-  const string endpoint() const {
-    return endpoint(procs_id());
-  }
+
   /**
    * @return endpoint of the router of a procs with the specified id
    */
-  const string endpoint(int procs_id) const {
-    CHECK_LT(procs_id, nprocs_);
-    CHECK_GE(procs_id, 0);
-    return endpoints_.at(procs_id);
-  }
+  const string endpoint(const int procs_id) const;
+
   const string workspace() {return cluster_.workspace();}
   const string vis_folder(){
     return cluster_.workspace()+"/visualization";
@@ -127,6 +122,11 @@ class Cluster {
   }
 
   int ProcsIDOf(int group_id, int id, int flag);
+  const string hostname() const {
+    return hostname_;
+  }
+  void Register(const string& endpoint);
+
  private:
   Cluster(const ClusterProto &cluster, int procs_id) ;
   void SetupFolders(const ClusterProto &cluster);
@@ -135,6 +135,7 @@ class Cluster {
  private:
   int procs_id_;
   int nprocs_;
+  string hostname_;
   std::vector<std::string> endpoints_;
   // cluster config proto
   ClusterProto cluster_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 08bea90..1b877ec 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -10,7 +10,7 @@ namespace singa {
 typedef void (*rt_callback)(void *contest);
 
 /**
- * ClusterRuntime is a runtime service that manages dynamic configuration 
+ * ClusterRuntime is a runtime service that manages dynamic configuration
  * and status of the whole cluster. It mainly provides following services:
  *    1)  Provide running status of each server/worker
  *    2)  Translate process id to (hostname:port)
@@ -35,8 +35,8 @@ class ClusterRuntime {
    */
   virtual std::string GetProcHost(int proc_id) = 0;
   /**
-   * Server: watch all workers in a server group, 
-   * will be notified when all workers have left 
+   * Server: watch all workers in a server group,
+   * will be notified when all workers have left
    */
   virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0;
   /**

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 38c0d79..2039162 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -36,7 +36,6 @@ Dealer::Dealer(int id) : id_(id) {
   CHECK_NOTNULL(poller_);
 }
 
-<<<<<<< HEAD
 Dealer::~Dealer() {
   zsock_destroy(&dealer_);
 }
@@ -92,13 +91,14 @@ Router::~Router() {
   }
 }
 
-int Router::Bind(const std::string& endpoint) {
-  CHECK_GT(endpoint.length(), 0);
-  if (endpoint.length()) {
-    CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()), 0);
-    return 1;
+int Router::Bind(string endpoint){
+  int port=-1;
+  if(endpoint.length()){
+    port=zsock_bind(router_, "%s", endpoint.c_str());
   }
-  return 0;
+  CHECK_NE(port,-1)<<endpoint;
+  LOG(INFO)<<"bind successfully to "<<endpoint+":"+std::to_string(port);
+  return port;
 }
 
 int Router::Send(Msg **msg) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
index 77898be..851d528 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -19,7 +19,7 @@
  * easily, e.g., AddLayer(layer_type, source_layers, meta_data).
  */
 
-DEFINE_int32(procsID, 0, "Global process ID");
+DEFINE_int32(procsID, -1, "Global process ID");
 DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
 DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index bdc1416..cd80296 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -25,17 +25,14 @@ void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){
 }
 
 void HandleWorkerFinish(void * ctx){
-  /*
   HandleContext* hctx=static_cast<HandleContext*> (ctx);
   Msg* msg=new Msg();
   msg->set_src(-1,-1, kRuntime);
   msg->set_dst(hctx->group_id, hctx->id, kServer);
   msg->set_type(kStop);
   hctx->dealer->Send(&msg);
-  */
 }
 
-<<<<<<< HEAD
 const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num,
     const vector<shared_ptr<Param>>& params){
   std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
@@ -417,6 +414,11 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
           if (interprocs_dealers.find(dst_procs_id)==interprocs_dealers.end()){
             auto dealer=make_shared<Dealer>();
             interprocs_dealers[dst_procs_id]=dealer;
+            while(cluster->endpoint(dst_procs_id)==""){
+              std::this_thread::sleep_for(
+                  std::chrono::milliseconds(kCollectSleepTime));
+              LOG(ERROR)<<"waiting for procs "<< dst_procs_id<<" to register";
+            }
             dealer->Connect("tcp://"+cluster->endpoint(dst_procs_id));
           }
           if(bandwidth(amount, start) <=cluster->bandwidth()){

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 035e0c7..d022a64 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -1,5 +1,6 @@
 #include <glog/logging.h>
 #include <fcntl.h>
+#include <unistd.h>
 #include <fstream>
 #include "utils/cluster.h"
 #include "proto/cluster.pb.h"
@@ -13,12 +14,17 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   procs_id_=procs_id;
   cluster_ = cluster;
   SetupFolders(cluster);
+  int nprocs;
   if(server_worker_separate())
     nprocs_=nworker_procs()+nserver_procs();
   else
-    nprocs_=std::max(nworker_procs(), nserver_procs());
-  CHECK_LT(procs_id, nprocs_);
-  if(nprocs_>1){
+    nprocs=std::max(nworker_procs(), nserver_procs());
+  CHECK_LT(procs_id, nprocs);
+  if (cluster_.has_nprocs())
+    CHECK_EQ(cluster.nprocs(), nprocs);
+  else
+    cluster_.set_nprocs(nprocs);
+  if(nprocs>1&&procs_id>-1){
     std::ifstream ifs(cluster.hostfile(), std::ifstream::in);
     std::string line;
     while(std::getline(ifs, line)
@@ -49,8 +55,25 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   auto rt=new ZKClusterRT(cluster_.zookeeper_host());
   rt->Init();
   cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
+
+  char buf[128];
+  gethostname(buf, 128);
+  hostname_=string(buf);
 }
 
+void Cluster::Register(const string& endpoint){
+  procs_id_=cluster_rt_->RegistProc(endpoint);
+  CHECK_GE(procs_id_,0);
+  CHECK_LT(procs_id_,nprocs());
+}
+const string Cluster::endpoint(int procsid) const{
+  CHECK_LT(procsid, nprocs());
+  CHECK_GE(procsid, 0);
+  if(endpoints_.size())
+    return endpoints_.at(procsid);
+  else
+    return cluster_rt_->GetProcHost(procsid);
+}
 void Cluster::SetupFolders(const ClusterProto &cluster){
   // create visulization folder
   mkdir(vis_folder().c_str(),  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);