You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/05/12 08:00:24 UTC
incubator-singa git commit: fix bugs from zmq sockets: move socekts
creation to sub threads.
Repository: incubator-singa
Updated Branches:
refs/heads/master 9ca741712 -> 06f85e23e
fix bugs from zmq sockets: move socekts creation to sub threads.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/06f85e23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/06f85e23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/06f85e23
Branch: refs/heads/master
Commit: 06f85e23eb0a5c969b7758716cf4090b4de302ce
Parents: 9ca7417
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Tue May 12 13:58:02 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Tue May 12 13:58:02 2015 +0800
----------------------------------------------------------------------
examples/cifar10/Makefile | 2 +-
include/trainer/server.h | 7 +++----
include/trainer/worker.h | 9 ++++-----
src/trainer/server.cc | 10 +++++-----
src/trainer/trainer.cc | 15 +++++----------
src/trainer/worker.cc | 27 ++++++++++++---------------
6 files changed, 30 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/examples/cifar10/Makefile
----------------------------------------------------------------------
diff --git a/examples/cifar10/Makefile b/examples/cifar10/Makefile
index 40fece6..16c329f 100644
--- a/examples/cifar10/Makefile
+++ b/examples/cifar10/Makefile
@@ -5,7 +5,7 @@ libs :=singa glog protobuf
download: cifar-10-binary-bin
cifar-10-binary-bin:
- wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz
+ #wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz
tar xf cifar-10-binary.tar.gz
create:
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index d113c7d..e37dab4 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -8,13 +8,12 @@ using std::shared_ptr;
namespace singa {
class Server{
public:
- Server(int group_id, int server_id);
- void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard,
- shared_ptr<Dealer> dealer);
+ Server(int thread_id, int group_id, int server_id);
+ void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard);
void Run();
protected:
- int group_id_, server_id_;
+ int thread_id_, group_id_, server_id_;
shared_ptr<PMServer> pmserver_;
shared_ptr<Dealer> dealer_;
};
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 0e9f356..09ef49d 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -39,11 +39,10 @@ class Performance{
*/
class Worker {
public:
- Worker(int group_id, int worker_id);
+ Worker(int thread_id, int group_id, int worker_id);
~Worker(){}
void Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net,
- shared_ptr<PMWorker::ParamShard> shard, shared_ptr<Dealer> layer_dealer,
- shared_ptr<Dealer> param_dealer);
+ shared_ptr<PMWorker::ParamShard> shard);
void set_test_net(shared_ptr<NeuralNet> test_net){
test_net_=test_net;
}
@@ -160,7 +159,7 @@ class Worker {
void ReceiveBlobs(shared_ptr<NeuralNet> net);
void SendBlob();
protected:
- int group_id_, worker_id_;
+ int thread_id_, group_id_, worker_id_;
int step_;
ModelProto modelproto_;
shared_ptr<PMWorker> pmworker_;
@@ -172,7 +171,7 @@ class Worker {
class BPWorker: public Worker{
public:
~BPWorker(){}
- BPWorker(int group_id, int worker_id):Worker(group_id, worker_id){}
+ BPWorker(int thread_id, int group_id, int worker_id):Worker(thread_id, group_id, worker_id){}
virtual void TrainOneBatch(int step);
virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase);
void Forward(shared_ptr<NeuralNet> net, int step, bool training);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index f5877c5..5431955 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -9,20 +9,20 @@
namespace singa {
-Server::Server(int group_id, int server_id):
- group_id_(group_id), server_id_(server_id){}
+Server::Server(int thread_id, int group_id, int server_id):
+ thread_id_(thread_id), group_id_(group_id), server_id_(server_id){}
void Server::Setup(const UpdaterProto& proto,
- shared_ptr<PMServer::ParamShard> shard,
- shared_ptr<Dealer> dealer){
+ shared_ptr<PMServer::ParamShard> shard){
//VLOG(3) << "Parsing config file for host "<<hosts[id_] << " server id = " <<id_;
pmserver_=shared_ptr<PMServer>(Singleton<Factory<PMServer>>::Instance()
->Create("PMServer"));
pmserver_->Setup(group_id_, server_id_, shard, proto);
- dealer_=dealer;
}
void Server::Run(){
+ dealer_=std::make_shared<Dealer>(thread_id_*2);
+ dealer_->Connect(kInprocRouterEndpoint);
Msg* ping=new Msg();
ping->set_src(group_id_, server_id_, kServer);
ping->set_dst(0,0,kStub);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 0a1edc8..89e97f1 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -45,6 +45,7 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
int procs_id){
RegisterDefaultClasses(mproto);
+ int nthreads=1;
auto cluster=Cluster::Get(cproto, procs_id);
// create servers
vector<shared_ptr<Server>> servers;
@@ -59,10 +60,8 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
// the ParamShard for servers consists of a dictionary of Param objects
auto shard=make_shared<PMServer::ParamShard>();
for(int sid=start;sid<end;sid++){
- auto server=make_shared<Server>(gid, sid);
- auto dealer=make_shared<Dealer>(nSocket++);
- dealer->Connect(kInprocRouterEndpoint);
- server->Setup(mproto.updater(), shard, dealer);
+ auto server=make_shared<Server>(nthreads++, gid, sid);
+ server->Setup(mproto.updater(), shard);
servers.push_back(server);
}
}
@@ -129,15 +128,11 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
for(int wid=wstart;wid<wend;wid++){
shared_ptr<Worker> worker=nullptr;
if(mproto.alg()==ModelProto_GradCalcAlg_kBackPropagation)
- worker=make_shared<BPWorker>(gid, wid);
+ worker=make_shared<BPWorker>(nthreads++,gid, wid);
else{
// TODO add CDWorker
}
- auto layer_dealer=make_shared<Dealer>(nSocket++);
- auto param_dealer=make_shared<Dealer>(nSocket++);
- layer_dealer->Connect(kInprocRouterEndpoint);
- param_dealer->Connect(kInprocRouterEndpoint);
- worker->Setup(mproto, train_net, shard, layer_dealer, param_dealer);
+ worker->Setup(mproto, train_net, shard);
worker->set_test_net(test_net);
worker->set_validation_net(validation_net);
workers.push_back(worker);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index a290996..138c954 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -8,40 +8,37 @@
#include "proto/model.pb.h"
using std::thread;
namespace singa {
-Worker::Worker( int group_id, int worker_id):
- group_id_(group_id), worker_id_(worker_id){
+Worker::Worker(int thread_id, int group_id, int worker_id):
+ thread_id_(thread_id),group_id_(group_id), worker_id_(worker_id){
}
void Worker::Setup(const ModelProto& model,
shared_ptr<NeuralNet> train_net,
- shared_ptr<PMWorker::ParamShard> shard,
- shared_ptr<Dealer> layer_dealer,
- shared_ptr<Dealer> param_dealer){
+ shared_ptr<PMWorker::ParamShard> shard){
train_net_=train_net;
modelproto_=model;
- layer_dealer_=layer_dealer;
- param_dealer_=param_dealer;
- if(layer_dealer_!=nullptr)
- layer_poller_.Add(layer_dealer_.get());
- if(param_dealer_!=nullptr)
- param_poller_.Add(param_dealer_.get());
pmworker_=shared_ptr<PMWorker>(Singleton<Factory<PMWorker>>::Instance()
->Create("PMWorker"));
pmworker_->Setup(group_id_, worker_id_, shard);
step_=modelproto_.step();
+}
+
+void Worker::Run(){
+ param_dealer_=std::make_shared<Dealer>(thread_id_*2+1);
+ param_dealer_->Connect(kInprocRouterEndpoint);
+ //layer_dealer_=std::make_shared<Dealer>(thread_id_*2);
// init params
- for(auto layer: train_net->layers())
+ for(auto layer: train_net_->layers())
if(group_id_==0&&layer->locationid()==worker_id_)
for(auto param: layer->GetParams()){
if(param->owner()<0||param->owner()==param->id()){
param->Init();
Put(param, step_);
}
- Get(param, step_);
+ else
+ Get(param, step_);
}
-}
-void Worker::Run(){
step_=modelproto_.step();
Performance perf(train_net_);
while(!StopNow(step_)){