You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/06/26 14:51:03 UTC

incubator-singa git commit: SINGA-24 Implement Downpour training framework

Repository: incubator-singa
Updated Branches:
  refs/heads/master 9dc0567a1 -> 14ce5d9ae


SINGA-24 Implement Downpour training framework

Downpour training framwork has multiple worker groups and single server groups.
Note: Param slices of servers would share memory space with local workers.
If the local worker is not from group 0 who does the put requests, but it has Param slices in local servers,
then it has to tell local servers in the Get requests the pointers for the shared slices's memory space.

Tested with worker_server_separate= true and false, server/worker group with one and more workers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/14ce5d9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/14ce5d9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/14ce5d9a

Branch: refs/heads/master
Commit: 14ce5d9aee5d9c4a2cd6c7bc3a64ae3df4f5902f
Parents: 9dc0567
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Fri Jun 26 16:46:10 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Fri Jun 26 16:53:57 2015 +0800

----------------------------------------------------------------------
 include/trainer/trainer.h |  2 +-
 include/utils/blob.h      |  1 +
 src/trainer/server.cc     |  5 +++--
 src/trainer/trainer.cc    | 40 +++++++++++++++++++++-------------------
 src/trainer/worker.cc     |  7 ++++---
 src/utils/blob.cc         |  2 +-
 src/utils/cluster.cc      |  2 +-
 src/utils/param.cc        | 30 +++++++++++++++++++-----------
 8 files changed, 51 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 31d3704..c19a0ae 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -44,7 +44,7 @@ typedef struct HandleContext_{
 class ParamInfo{
    public:
   ParamInfo(shared_ptr<Param> p,int local, int owner):
-    num_update(0), next_version(0),num_local(local), num_total(1),
+    num_update(0), next_version(-1),num_local(local), num_total(1),
     owner_procs(owner){
       shares.push_back(p);
     }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/include/utils/blob.h
----------------------------------------------------------------------
diff --git a/include/utils/blob.h b/include/utils/blob.h
index 97b4ee7..dd01dcb 100644
--- a/include/utils/blob.h
+++ b/include/utils/blob.h
@@ -42,6 +42,7 @@
 #define INCLUDE_UTILS_BLOB_
 #include <memory>
 #include <vector>
+//#include <atomic>
 #include <glog/logging.h>
 #include "proto/common.pb.h"
 using std::shared_ptr;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 14b25da..21ff21a 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -25,7 +25,7 @@ void Server::Setup(const UpdaterProto& proto,
 }
 
 void Server::Run(){
-  LOG(INFO)<<"Server (group_id= "<<group_id_<<", id="<<server_id_<<") starts";
+  LOG(ERROR)<<"Server (group_id= "<<group_id_<<", id="<<server_id_<<") starts";
   dealer_=std::make_shared<Dealer>(2*thread_id_);
   dealer_->Connect(kInprocRouterEndpoint);
   auto cluster=Cluster::Get();
@@ -64,7 +64,7 @@ void Server::Run(){
       if(shard_->find(pid)==shard_->end()){
         // delay the processing by re-queue the msg.
         response=msg;
-        DLOG(ERROR)<<"Requeue msg";
+        //LOG(INFO)<<"Requeue msg"<<type;
       }else if(type == kSyncReminder){
         DeleteMsg(&msg);
         if(syncEntry>=master_params.size())
@@ -133,6 +133,7 @@ Msg* Server::HandlePut(Msg **msg){
   param->set_version(version);
   param->set_local_version(version);
   param->set_id(pid);
+  //LOG(ERROR)<<"put norm "<<param->data().asum_data()<<", "<<pid;
   if(Cluster::Get()->nserver_groups()>1 &&
       slice2group_[pid]!=group_id_){
     last_data_[pid]=std::make_shared<Blob<float>>();

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 58e568e..c12ff84 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -131,7 +131,7 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
     pid-=cluster->nworker_procs();
   int gid=pid*cluster->nservers_per_procs()/cluster->nservers_per_group();
   int start=pid*cluster->nservers_per_procs()%cluster->nservers_per_group();
-  int end=start+cluster->nservers_per_group();
+  int end=start+cluster->nservers_per_procs();
   // the ServerShard for servers consists of a dictionary of Param objects
   server_shard_=make_shared<ServerShard>();
   auto slice2group=PartitionSlice(cluster->nserver_groups(), slices);
@@ -154,6 +154,16 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
 vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
     const ModelProto& mproto, vector<int> *slice_size){
   auto cluster=Cluster::Get();
+  auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
+      cluster->nworkers_per_group());
+  int lcm=LeastCommonMultiple(cluster->nserver_groups(), cluster->nservers_per_group());
+  auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
+  for(auto param: net->params()){
+    if(param->id()==param->owner())
+      for(auto entry: paramid2slices[param->id()])
+        slice_size->push_back(entry.second);
+  }
+
   vector<shared_ptr<Worker>> workers;
   if(!cluster->has_worker())
     return workers;
@@ -176,16 +186,6 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
     wstart=0;
     wend=cluster->nworkers_per_group();
   }
-  auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
-      cluster->nworkers_per_group());
-  int lcm=LeastCommonMultiple(cluster->nserver_groups(), cluster->nservers_per_group());
-  auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
-  for(auto param: net->params()){
-    if(param->id()==param->owner())
-      for(auto entry: paramid2slices[param->id()])
-        slice_size->push_back(entry.second);
-  }
-
   for(int gid=gstart;gid<gend;gid++){
     shared_ptr<NeuralNet> train_net, test_net, validation_net;
     if(gid==gstart)
@@ -415,8 +415,6 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
         }else{
           dst_procs_id=cluster->ProcsIDOf(msg->dst_first(),
               msg->dst_second(), msg->dst_flag());
-          if(type==kSync)
-            LOG(ERROR)<<msg->dst_first()<<","<<msg->dst_second()<<","<<dst_procs_id;
         }
         if(dst_procs_id!=procs_id_){
           // forward to other procs
@@ -425,7 +423,7 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
             interprocs_dealers[dst_procs_id]=dealer;
             while(cluster->endpoint(dst_procs_id)==""){
               std::this_thread::sleep_for(
-                  std::chrono::milliseconds(kCollectSleepTime));
+                  std::chrono::milliseconds(3000));//kCollectSleepTime));
               LOG(ERROR)<<"waiting for procs "<< dst_procs_id<<" to register";
             }
             dealer->Connect("tcp://"+cluster->endpoint(dst_procs_id));
@@ -435,6 +433,7 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
             amount=0;
           }
           amount+=msg->size();
+          //LOG(ERROR)<<"send inter msg of type "<<msg->type();
           interprocs_dealers[dst_procs_id]->Send(&msg);
         }else{
           if(type==kSyncRequest){
@@ -458,8 +457,7 @@ Msg* Trainer::HandleConnect(Msg** msg){
   reply->SetAddr(*msg);
   reply->add_frame("PONG", 4);
   reply->set_type(kConnect);
-  delete *msg;
-  *msg=NULL;
+  DeleteMsg(msg);
   return reply;
 }
 const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){
@@ -467,11 +465,15 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){
   vector<Msg*> replies;
   int version=msgg->trgt_third();
   if(msgg->src_flag()==kStub){
+    LOG(FATAL)<<"Not implemented";
+    /*
     if(version<=pi->shares.at(0)->version()){
-      pi->shares.at(0)->HandleGetMsg(msg);
+      replies.push_back(pi->shares.at(0)->HandleGetMsg(msg));
     }else if(version>pi->next_version){
       // reinsert into a msg queue.
+      replies.push_back(mmsg);
     }
+    */
   }else if(version>pi->next_version){
     pi->next_version=version;
     int gid=msgg->src_first();
@@ -480,12 +482,11 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){
     for(int idx=0, id=param->slice_start();idx<param->num_slices();idx++){
       int server=slice2server_[id+idx];
       int procs=Cluster::Get()->ProcsIDOf(group, server, kServer);
-      if(procs!=procs_id_)
-        LOG(ERROR)<<"Copy for update";
       auto x=param->GenGetMsg(procs!=procs_id_, idx);
       x->set_trgt(param->owner(), id+idx, param->local_version()+1);
       x->set_src(procs_id_, gid, kStub);
       x->set_dst(group, server, kServer);
+      //LOG(ERROR)<<"stub handle get for "<<idx+id<<","<<group<<","<<server;
       replies.push_back(x);
     }
   }
@@ -563,6 +564,7 @@ const vector<Msg*> Trainer::HandlePut(shared_ptr<ParamInfo>pi, Msg** msg){
     x->set_src(procs_id_, gid, kStub);
     x->set_dst(group, server, kServer);
     ret.push_back(x);
+    //LOG(ERROR)<<"stub handle put "<<start+idx<<"to "<<group<<","<<server;
   }
   DeleteMsg(msg);
   return ret;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 37acb14..5d301dc 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -47,7 +47,7 @@ void Worker::ConnectStub(shared_ptr<Dealer> dealer, EntityType type){
 }
 
 void Worker::Run(){
-  LOG(INFO)<<"Worker (group_id= "<<group_id_<<", id="<<worker_id_<<") starts";
+  LOG(ERROR)<<"Worker (group_id= "<<group_id_<<", id="<<worker_id_<<") starts";
   dealer_=make_shared<Dealer>(2*thread_id_);
   ConnectStub(dealer_, kWorkerParam);
   for(auto layer: train_net_->layers())
@@ -67,8 +67,9 @@ void Worker::Run(){
         if(param->owner() == param->id()){
           if(group_id_%Cluster::Get()->nworker_groups_per_server_group()==0)
             param->InitValues(0);
-          else
+          else{
             Get(param, modelproto_.warmup_steps());
+          }
         }
       }
   }
@@ -277,7 +278,7 @@ void BPWorker::Backward(int step, shared_ptr<NeuralNet> net){
         // receive grad blobs
       }
       layer->ComputeGradient();
-      if(DisplayDebugInfo(step)&&layer->mutable_grad(nullptr)!=nullptr){
+      if(layer->mutable_grad(nullptr)!=nullptr&&DisplayDebugInfo(step)){
         LOG(INFO)<<StringPrintf("Backward layer %10s grad norm1 %13.9f\t",
             layer->name().c_str(), layer->grad(nullptr).asum_data());
         for(shared_ptr<Param> p: layer->GetParams())

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/src/utils/blob.cc
----------------------------------------------------------------------
diff --git a/src/utils/blob.cc b/src/utils/blob.cc
index 92fc989..ae4c1b0 100644
--- a/src/utils/blob.cc
+++ b/src/utils/blob.cc
@@ -189,7 +189,7 @@ void* SyncedMemory::mutable_gpu_data() {
 template <typename Dtype>
 Blob<Dtype>::Blob(const vector<int>& shape)
   // capacity_ must be initialized before calling Reshape
-  : capacity_(0) {
+  : capacity_(0), version_(-1) {
   Reshape(shape);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 706d2ef..8f8024b 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -39,7 +39,7 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
       procs_ids_[Hash(i,j,kWorkerParam)]=procs;
     }
   }
-  int offset=cluster_.server_worker_separate()? procs:0;
+  int offset=cluster_.server_worker_separate()? procs+1:0;
   ngrps=cluster_.nserver_groups(), grp_size=cluster_.nservers_per_group();
   for(int i=0;i<ngrps;i++){
     for(int j=0;j<grp_size;j++){

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/14ce5d9a/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index 07ad8ce..02d80a1 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -99,8 +99,8 @@ Msg* Param::GenPutMsg(bool copy, int idx){
     sprintf(buf+strlen(buf), " %p ", ptr);
     msg->add_frame(buf, strlen(buf));
   }
-  pending_put_[idx]=true;
-  num_pending_requests_++;
+  //pending_put_[idx]=true;
+  //num_pending_requests_++;
 	return msg;
 }
 
@@ -108,7 +108,8 @@ Msg* Param::GenGetMsg(bool copy, int idx){
   CHECK_LT(idx, num_slices_);
   Msg* msg=new Msg();
   msg->set_type(kGet);
-  char buf[8]; sprintf(buf, " %d ", copy);
+  char buf[32]; sprintf(buf, " %d %p ", copy,
+      data_->cpu_data()+slice_offset_[idx]);
   msg->add_frame(buf, sizeof(buf));
   pending_get_[idx]=true;
   num_pending_requests_++;
@@ -123,7 +124,7 @@ Msg* Param::GenUpdateMsg(bool copy, int idx){
   msg->add_frame(buf, sizeof(buf));
   void* ptr=grad_.mutable_cpu_data()+slice_offset_[idx];
   if(copy){
-    LOG(ERROR)<<"Copy in gen update";
+    //LOG(ERROR)<<"Copy in gen update";
     msg->add_frame(ptr, slice_size_[idx]*sizeof(float));
   }
   else{ // to share values of grad blob
@@ -154,13 +155,11 @@ Msg* Param::HandlePutMsg(Msg** msg){
   vector<int> shape{size};
   ParamProto proto;
   Setup(proto, shape);
-  set_local_version((*msg)->trgt_third());
-  set_version((*msg)->trgt_third());
   if(ptr==nullptr){
     CHECK((*msg)->next_frame());
     CHECK_EQ(size* sizeof(float), (*msg)->frame_size());
     memcpy(mutable_cpu_data(), (*msg)->frame_data(), size*sizeof(float));
-  } else{
+  }else{
     data_->set_cpu_data(ptr);
   }
   DeleteMsg(msg);
@@ -169,10 +168,15 @@ Msg* Param::HandlePutMsg(Msg** msg){
 
 Msg* Param::HandleGetMsg(Msg** msg){
   int copy;
-  sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
+  float* ptr;
+  sscanf(static_cast<char*>((*msg)->frame_data()), " %d %p ", &copy, &ptr);
   (*msg)->next_frame();
   if(copy)
     (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
+  else if(ptr!=data_->cpu_data()){
+    memcpy(ptr, data_->cpu_data(), sizeof(float)*size());
+    data_->set_cpu_data(ptr);
+  }
   // else the mem space is shared among all worker and servers
   (*msg)->SwapAddr();
   (*msg)->set_type(kRGet);
@@ -184,7 +188,7 @@ int Param::ParseUpdateMsg(Msg** msg){
   sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
-    LOG(ERROR)<<"Copy in parse update";
+    //LOG(ERROR)<<"Copy in parse update";
     CHECK((*msg)->frame_size());
     memcpy(mutable_cpu_grad(), (*msg)->frame_data(),(*msg)->frame_size());
   }else {// use the same data field of the grad blob
@@ -202,9 +206,12 @@ Msg* Param::GenUpdateResponseMsg(bool copy){
   char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
   if(copy){
-    LOG(ERROR)<<"Copy in gen";
+    //LOG(ERROR)<<"Copy in gen";
+  //  LOG(ERROR)<<"gen copy resonse for "<<id()<<", "<<size();
     msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
   }
+  //  LOG(ERROR)<<"gen share resonse for "<<id()<<", "<<size();
+
   return msg;
 }
 
@@ -237,10 +244,11 @@ void Param::ParseResponseMsg(Msg** msg, int slice_idx){
   sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
-    CHECK((*msg)->frame_size());
+        CHECK_EQ((*msg)->frame_size(), slice_size_[slice_idx]*sizeof(float));
     memcpy(mutable_cpu_data()+slice_offset_[slice_idx],
         (*msg)->frame_data(), (*msg)->frame_size());
   }
+  //LOG(ERROR)<<"parse response norm "<<data_->asum_data()<<" of "<<id();
   DeleteMsg(msg);
 }
 }