You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/05/12 08:00:24 UTC

incubator-singa git commit: fix bugs from zmq sockets: move socekts creation to sub threads.

Repository: incubator-singa
Updated Branches:
  refs/heads/master 9ca741712 -> 06f85e23e


fix bugs from zmq sockets: move socekts creation to sub threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/06f85e23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/06f85e23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/06f85e23

Branch: refs/heads/master
Commit: 06f85e23eb0a5c969b7758716cf4090b4de302ce
Parents: 9ca7417
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Tue May 12 13:58:02 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Tue May 12 13:58:02 2015 +0800

----------------------------------------------------------------------
 examples/cifar10/Makefile |  2 +-
 include/trainer/server.h  |  7 +++----
 include/trainer/worker.h  |  9 ++++-----
 src/trainer/server.cc     | 10 +++++-----
 src/trainer/trainer.cc    | 15 +++++----------
 src/trainer/worker.cc     | 27 ++++++++++++---------------
 6 files changed, 30 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/examples/cifar10/Makefile
----------------------------------------------------------------------
diff --git a/examples/cifar10/Makefile b/examples/cifar10/Makefile
index 40fece6..16c329f 100644
--- a/examples/cifar10/Makefile
+++ b/examples/cifar10/Makefile
@@ -5,7 +5,7 @@ libs :=singa glog protobuf
 download: cifar-10-binary-bin
 
 cifar-10-binary-bin:
-	wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz
+	#wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz
 	tar xf cifar-10-binary.tar.gz
 
 create:

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index d113c7d..e37dab4 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -8,13 +8,12 @@ using std::shared_ptr;
 namespace singa {
 class Server{
  public:
-  Server(int group_id, int server_id);
-  void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard,
-    shared_ptr<Dealer> dealer);
+  Server(int thread_id, int group_id, int server_id);
+  void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard);
   void Run();
 
  protected:
-  int group_id_, server_id_;
+  int thread_id_, group_id_, server_id_;
   shared_ptr<PMServer> pmserver_;
   shared_ptr<Dealer> dealer_;
 };

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 0e9f356..09ef49d 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -39,11 +39,10 @@ class Performance{
  */
 class Worker {
  public:
-  Worker(int group_id, int worker_id);
+  Worker(int thread_id, int group_id, int worker_id);
   ~Worker(){}
   void Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net,
-      shared_ptr<PMWorker::ParamShard> shard, shared_ptr<Dealer> layer_dealer,
-    shared_ptr<Dealer> param_dealer);
+      shared_ptr<PMWorker::ParamShard> shard);
   void set_test_net(shared_ptr<NeuralNet> test_net){
     test_net_=test_net;
   }
@@ -160,7 +159,7 @@ class Worker {
   void ReceiveBlobs(shared_ptr<NeuralNet> net);
   void SendBlob();
  protected:
-  int group_id_, worker_id_;
+  int thread_id_, group_id_, worker_id_;
   int step_;
   ModelProto modelproto_;
   shared_ptr<PMWorker> pmworker_;
@@ -172,7 +171,7 @@ class Worker {
 class BPWorker: public Worker{
  public:
   ~BPWorker(){}
-  BPWorker(int group_id, int worker_id):Worker(group_id, worker_id){}
+  BPWorker(int thread_id, int group_id, int worker_id):Worker(thread_id, group_id, worker_id){}
   virtual void TrainOneBatch(int step);
   virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase);
   void Forward(shared_ptr<NeuralNet> net, int step, bool training);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index f5877c5..5431955 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -9,20 +9,20 @@
 
 
 namespace singa {
-Server::Server(int group_id, int server_id):
-  group_id_(group_id), server_id_(server_id){}
+Server::Server(int thread_id, int group_id, int server_id):
+  thread_id_(thread_id), group_id_(group_id), server_id_(server_id){}
 
 void Server::Setup(const UpdaterProto& proto,
-    shared_ptr<PMServer::ParamShard> shard,
-    shared_ptr<Dealer> dealer){
+    shared_ptr<PMServer::ParamShard> shard){
 	//VLOG(3) << "Parsing config file for host "<<hosts[id_] << " server id = " <<id_;
   pmserver_=shared_ptr<PMServer>(Singleton<Factory<PMServer>>::Instance()
       ->Create("PMServer"));
   pmserver_->Setup(group_id_, server_id_, shard, proto);
-  dealer_=dealer;
 }
 
 void Server::Run(){
+  dealer_=std::make_shared<Dealer>(thread_id_*2);
+  dealer_->Connect(kInprocRouterEndpoint);
   Msg* ping=new Msg();
   ping->set_src(group_id_, server_id_, kServer);
   ping->set_dst(0,0,kStub);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 0a1edc8..89e97f1 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -45,6 +45,7 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
     int procs_id){
   RegisterDefaultClasses(mproto);
 
+  int nthreads=1;
   auto cluster=Cluster::Get(cproto, procs_id);
   // create servers
   vector<shared_ptr<Server>> servers;
@@ -59,10 +60,8 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
     // the ParamShard for servers consists of a dictionary of Param objects
     auto shard=make_shared<PMServer::ParamShard>();
     for(int sid=start;sid<end;sid++){
-      auto server=make_shared<Server>(gid, sid);
-      auto dealer=make_shared<Dealer>(nSocket++);
-      dealer->Connect(kInprocRouterEndpoint);
-      server->Setup(mproto.updater(), shard, dealer);
+      auto server=make_shared<Server>(nthreads++, gid, sid);
+      server->Setup(mproto.updater(), shard);
       servers.push_back(server);
     }
   }
@@ -129,15 +128,11 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
       for(int wid=wstart;wid<wend;wid++){
         shared_ptr<Worker> worker=nullptr;
         if(mproto.alg()==ModelProto_GradCalcAlg_kBackPropagation)
-          worker=make_shared<BPWorker>(gid, wid);
+          worker=make_shared<BPWorker>(nthreads++,gid, wid);
         else{
         // TODO add CDWorker
         }
-        auto layer_dealer=make_shared<Dealer>(nSocket++);
-        auto param_dealer=make_shared<Dealer>(nSocket++);
-        layer_dealer->Connect(kInprocRouterEndpoint);
-        param_dealer->Connect(kInprocRouterEndpoint);
-        worker->Setup(mproto, train_net, shard, layer_dealer, param_dealer);
+        worker->Setup(mproto, train_net, shard);
         worker->set_test_net(test_net);
         worker->set_validation_net(validation_net);
         workers.push_back(worker);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index a290996..138c954 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -8,40 +8,37 @@
 #include "proto/model.pb.h"
 using std::thread;
 namespace singa {
-Worker::Worker( int group_id, int worker_id):
-  group_id_(group_id), worker_id_(worker_id){
+Worker::Worker(int thread_id,  int group_id, int worker_id):
+  thread_id_(thread_id),group_id_(group_id), worker_id_(worker_id){
   }
 
 void Worker::Setup(const ModelProto& model,
     shared_ptr<NeuralNet> train_net,
-    shared_ptr<PMWorker::ParamShard> shard,
-    shared_ptr<Dealer> layer_dealer,
-    shared_ptr<Dealer> param_dealer){
+    shared_ptr<PMWorker::ParamShard> shard){
   train_net_=train_net;
   modelproto_=model;
-  layer_dealer_=layer_dealer;
-  param_dealer_=param_dealer;
-  if(layer_dealer_!=nullptr)
-    layer_poller_.Add(layer_dealer_.get());
-  if(param_dealer_!=nullptr)
-    param_poller_.Add(param_dealer_.get());
   pmworker_=shared_ptr<PMWorker>(Singleton<Factory<PMWorker>>::Instance()
       ->Create("PMWorker"));
   pmworker_->Setup(group_id_, worker_id_, shard);
   step_=modelproto_.step();
+}
+
+void Worker::Run(){
+  param_dealer_=std::make_shared<Dealer>(thread_id_*2+1);
+  param_dealer_->Connect(kInprocRouterEndpoint);
+  //layer_dealer_=std::make_shared<Dealer>(thread_id_*2);
   // init params
-  for(auto layer: train_net->layers())
+  for(auto layer: train_net_->layers())
     if(group_id_==0&&layer->locationid()==worker_id_)
       for(auto param: layer->GetParams()){
         if(param->owner()<0||param->owner()==param->id()){
           param->Init();
           Put(param, step_);
         }
-        Get(param, step_);
+        else
+          Get(param, step_);
       }
-}
 
-void Worker::Run(){
   step_=modelproto_.step();
   Performance perf(train_net_);
   while(!StopNow(step_)){