You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/05/27 16:39:11 UTC

[10/22] incubator-singa git commit: move performence display from worker to stub: worker send performance as msgs to stub; fix bugs from multi threads call losslayers() of neuralnet by init members of neuralnet in main thread. Then sub threads only read

move performence display from worker to stub: worker send performance as msgs to stub; fix bugs from multi threads call losslayers() of neuralnet by init members of neuralnet in main thread. Then sub threads only read these members and no conflicts from concurrent write.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/a617e6c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/a617e6c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/a617e6c3

Branch: refs/heads/master
Commit: a617e6c3da2ced13b6e6d9c7f9bbcbaae5045a11
Parents: c318a98
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Tue May 26 15:24:26 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Tue May 26 15:24:26 2015 +0800

----------------------------------------------------------------------
 examples/cifar10/cluster.conf |  4 +--
 examples/cifar10/model.conf   |  4 +--
 include/trainer/worker.h      |  2 +-
 include/utils/common.h        | 38 ++++++++++++++++++-------
 src/neuralnet/neuralnet.cc    |  8 ++++--
 src/proto/model.pb.h          |  5 ++--
 src/proto/model.proto         |  1 +
 src/trainer/trainer.cc        | 51 ++++++++++++++++++++++++----------
 src/trainer/worker.cc         | 57 +++++++++++++++++---------------------
 src/utils/cluster_rt.cc       |  4 +--
 10 files changed, 107 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/examples/cifar10/cluster.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/cluster.conf b/examples/cifar10/cluster.conf
index 97c64fd..eaad0c8 100644
--- a/examples/cifar10/cluster.conf
+++ b/examples/cifar10/cluster.conf
@@ -1,6 +1,6 @@
 nworker_groups: 1
 nserver_groups: 1
 nservers_per_group: 1
-nworkers_per_group: 1
-nworkers_per_procs: 1
+nworkers_per_group: 4
+nworkers_per_procs: 4
 workspace: "examples/cifar10/"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/examples/cifar10/model.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/model.conf b/examples/cifar10/model.conf
index 86b05af..cbed20b 100644
--- a/examples/cifar10/model.conf
+++ b/examples/cifar10/model.conf
@@ -1,7 +1,7 @@
 name: "cifar10-convnet"
 train_steps: 5
-test_steps:0
-test_frequency:0
+test_steps:5
+test_frequency:3
 display_frequency:2
 updater{
   momentum:0.9

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 2c7ce04..03e7e30 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -56,7 +56,7 @@ class Worker {
     * @param net, neural network
     * @param phase kValidation or kTest.
     */
-  void Test(shared_ptr<NeuralNet> net, int nsteps, bool dispperf);
+  void Test(shared_ptr<NeuralNet> net, int nsteps, const string &prefix);
 
   /**
     * Main function of Worker.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/include/utils/common.h
----------------------------------------------------------------------
diff --git a/include/utils/common.h b/include/utils/common.h
index 5a59127..1644598 100644
--- a/include/utils/common.h
+++ b/include/utils/common.h
@@ -5,12 +5,9 @@
 #include <gflags/gflags.h>
 #include <google/protobuf/message.h>
 #include <stdarg.h>
-#include <thread>         // std::this_thread::sleep_for
-#include <chrono>
 #include <string>
 #include <vector>
-#include <mutex>
-#include <queue>
+#include <sstream>
 #include <sys/stat.h>
 #include <map>
 
@@ -39,9 +36,11 @@ inline bool check_exists(const std::string& name) {
     return (stat (name.c_str(), &buffer) == 0);
 }
 
+/*
 inline void Sleep(int millisec=1){
   std::this_thread::sleep_for(std::chrono::milliseconds(millisec));
 }
+*/
 
 inline float rand_real(){
   return  static_cast<float>(rand())/(RAND_MAX+1.0f);
@@ -50,11 +49,18 @@ inline float rand_real(){
 class Metric{
  public:
   Metric():counter_(0){}
-  void AddMetric(string name, float value){
-    if(data_.find(name)==data_.end())
-      data_[name]=value;
+  void AddMetric(const string& name, float value){
+    string prefix=name;
+    if(name.find("@")!=string::npos)
+      prefix=name.substr(0, name.find("@"));
+    if(data_.find(prefix)==data_.end())
+      data_[prefix]=value;
     else
-      data_[name]+=value;
+      data_[prefix]+=value;
+  }
+  void AddMetrics(const Metric& other){
+    for(auto& entry: other.data_)
+      AddMetric(entry.first, entry.second);
   }
   void Reset(){
     data_.clear();
@@ -68,12 +74,24 @@ class Metric{
     counter_++;
   }
   const string ToString() const{
-    string disp="";
+    string disp=std::to_string(data_.size())+" fields, ";
     for(const auto& entry: data_){
-      disp+=entry.first+":"+std::to_string(entry.second)+"\t";
+      disp+=entry.first+" : "+std::to_string(entry.second)+"\t";
     }
     return disp;
   }
+  void ParseString(const string & perf) {
+    std::stringstream stream(perf);
+    int n;
+    string str;
+    stream>>n>>str;
+    for(int i=0;i<n;i++){
+      float f;
+      string sep;
+      stream>>str>>sep>>f;
+      data_[str]=f;
+    }
+  }
  private:
   map<string, float> data_;
   int counter_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index ca371f6..b88d4a6 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -85,6 +85,11 @@ NeuralNet::NeuralNet(NetProto net_proto, int group_size) {
     }
   }
   LOG(INFO)<<"Neural Net constructed";
+  // init all data members to avoid conflicts from multi-thread access
+  losslayers();
+  paramid2param(0);
+  datalayers();
+  parserlayers();
 }
 
 void NeuralNet::ConstructNeuralNet(const NetProto& net_proto){
@@ -249,7 +254,7 @@ Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers,
       for(int i=0;i<gsize;i++){
         sprintf(suffix, "%02d", i);
         // differentiate partitions
-        string nodename=layer->name()+"-"+string(suffix);
+        string nodename=layer->name()+"@"+string(suffix);
         auto node=graph.AddNode(nodename, LayerInfo{layer->name(), i,-1,-1});
         nodes.push_back(node);
       }
@@ -263,7 +268,6 @@ Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers,
     layer2nodes[layer->name()]=nodes;
   }
 
-
   // connect nodes, nodes for ConcateLayer and SliceLayer are added.
   for(shared_ptr<Layer> layer: layers){
     string name=layer->name();

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/src/proto/model.pb.h
----------------------------------------------------------------------
diff --git a/src/proto/model.pb.h b/src/proto/model.pb.h
index 9770964..9ac1e54 100644
--- a/src/proto/model.pb.h
+++ b/src/proto/model.pb.h
@@ -193,11 +193,12 @@ enum MsgType {
   kData = 7,
   kRGet = 8,
   kRUpdate = 9,
-  kConnect = 10
+  kConnect = 10,
+  kMetric = 11
 };
 bool MsgType_IsValid(int value);
 const MsgType MsgType_MIN = kGet;
-const MsgType MsgType_MAX = kConnect;
+const MsgType MsgType_MAX = kMetric;
 const int MsgType_ARRAYSIZE = MsgType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* MsgType_descriptor();

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/src/proto/model.proto
----------------------------------------------------------------------
diff --git a/src/proto/model.proto b/src/proto/model.proto
index 5dcf9c0..59c1a52 100644
--- a/src/proto/model.proto
+++ b/src/proto/model.proto
@@ -11,6 +11,7 @@ enum MsgType{
   kRGet=8;
   kRUpdate=9;
   kConnect=10;
+  kMetric=11;
 };
 
 enum EntityType{

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 37e9883..02e60a4 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -76,13 +76,10 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
         auto server=make_shared<Server>(nthreads++, gid, sid);
         server->Setup(mproto.updater(), shard);
         servers.push_back(server);
-        HandleContext hc;
-        hc.dealer=dealer;
-        hc.group_id=gid;
-        hc.id=sid;
+        HandleContext hc{dealer, gid, sid};
         ctx.push_back(hc);
-        cluster->runtime()->sWatchSGroup(gid, sid, HandleWorkerFinish,
-            &ctx.back());
+        CHECK(cluster->runtime()->sWatchSGroup(gid, sid, HandleWorkerFinish,
+            &ctx.back()));
       }
     }
   }
@@ -193,6 +190,9 @@ void Trainer::Run(int nworkers, int nservers,
     router->Bind(cluster->endpoint());
 
   map<int, shared_ptr<Dealer>> interprocs_dealers;
+  Metric perf;
+  int perf_step=-1;
+  string perf_prefix;
   bool stop=false;
   while(!stop){
     Msg* msg=router->Receive();
@@ -218,8 +218,27 @@ void Trainer::Run(int nworkers, int nservers,
             stop=true;
             break;
           }
-          LOG(ERROR)<<"Stub recv Stop";
-        }else{
+        }else if(type==kMetric){
+          int step=msg->target_first();
+          string prefix((char*)msg->frame_data(), msg->frame_size());
+          if(step!=perf_step||perf_prefix!=prefix){
+            if(perf_step>=0){
+              perf.Avg();
+              LOG(ERROR)<<perf_prefix<<" step-"
+                <<perf_step<<", "<<perf.ToString();
+              perf.Reset();
+            }
+            perf_step=step;
+            perf_prefix=prefix;
+          }
+          msg->next_frame();
+          Metric cur;
+          cur.ParseString(string((char*)msg->frame_data(), msg->frame_size()));
+          perf.AddMetrics(cur);
+          perf.Inc();
+          delete msg;
+          msg=nullptr;
+        }else {
           int group_id=msg->src_first();
           int paramid=msg->target_first();
           auto entry=shards.at(group_id)->at(paramid);
@@ -251,19 +270,21 @@ void Trainer::Run(int nworkers, int nservers,
           dst_procs_id=ProcsIDOf(msg->dst_first(), msg->dst_second(), msg->dst_flag());
         }
         if(dst_procs_id!=procs_id_){
-          /*
-             // forward to other procs
-             if (interprocs_dealers.find(procs_id)==interprocs_dealers.end())
-             interprocs_dealers[procs_id]=make_shared<Dealer>(procs_id);
-             interprocs_dealers[procs_id]->Send(&msg);
-             */
+        /*
+          // forward to other procs
+          if (interprocs_dealers.find(procs_id)==interprocs_dealers.end())
+          interprocs_dealers[procs_id]=make_shared<Dealer>(procs_id);
+          interprocs_dealers[procs_id]->Send(&msg);
+          */
         }else{
           router->Send(&msg);
         }
       }
     }
   }
-  LOG(ERROR)<<"Stub finishes";
+  perf.Avg();
+  if(perf_step>=0)
+    LOG(ERROR)<<perf_prefix<<" step-"<<perf_step<<", "<<perf.ToString();
 }
 Msg* Trainer::HandleConnect(Msg** msg){
   string ping((char*)(*msg)->frame_data(), (*msg)->frame_size());

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index f0b54ea..955ee29 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -21,7 +21,7 @@ void Worker::Setup(const ModelProto& model,
   modelproto_=model;
   auto cluster=Cluster::Get();
   int sgid=group_id_/cluster->nworker_groups_per_server_group();
-  cluster->runtime()->wJoinSGroup(group_id_, worker_id_, sgid);
+  CHECK(cluster->runtime()->wJoinSGroup(group_id_, worker_id_, sgid));
 }
 
 void Worker::Run(){
@@ -150,28 +150,28 @@ int Worker::Collect(shared_ptr<Param> param, int step){
   return 1;
 }
 const void Worker::DisplayPerformance(const Metric & perf, const string& prefix){
-  /* TODO send perf to Stub thread for printing
-     Msg* msg=new Msg();
-     msg->set_src(group_id_, worker_id_, kWorkerParam);
-     msg->set_dst(-1,-1, kStub);
-     msg->set_type(kMetric);
-     const string disp=perf.ToString();
-     msg->AddFrame(disp.c_str(), disp.length());
-     param_dealer_->Send(&msg);
-     */
-  LOG(ERROR)<<prefix<<" "<<perf.ToString();
+  Msg* msg=new Msg();
+  msg->set_src(group_id_, worker_id_, kWorkerParam);
+  msg->set_dst(-1,-1, kStub);
+  msg->set_type(kMetric);
+  msg->set_target(step_,0);
+  const string disp=perf.ToString();
+  msg->add_frame(prefix.c_str(), prefix.length());
+  msg->add_frame(disp.c_str(), disp.length());
+  param_dealer_->Send(&msg);
+  //LOG(ERROR)<<prefix<<" "<<perf.ToString();
 }
 
 void Worker::RunOneBatch(int step, Metric* perf){
   if(ValidateNow(step)){
-    LOG(ERROR)<<"Validation at step "<<step;
+    //LOG(ERROR)<<"Validation at step "<<step;
     CollectAll(validation_net_, step);
-    Test(validation_net_, modelproto_.validation_steps(), perf!=nullptr);
+    Test(validation_net_, modelproto_.validation_steps(), "Validation");
   }
   if(TestNow(step)){
-    LOG(ERROR)<<"Test at step "<<step;
+    //LOG(ERROR)<<"Test at step "<<step;
     CollectAll(test_net_, step);
-    Test(test_net_, modelproto_.test_steps(), perf!=nullptr);
+    Test(test_net_, modelproto_.test_steps(), "Test");
   }
   TrainOneBatch(step);
   if(perf!=nullptr){
@@ -180,13 +180,13 @@ void Worker::RunOneBatch(int step, Metric* perf){
       if(layer->partitionid()==worker_id_){
         const float * ptr=layer->metric().cpu_data();
         for(int j=0;j<layer->metric().count();j++)
-          perf->AddMetric(layer->name()+"-"+std::to_string(j), ptr[j]);
+          perf->AddMetric(std::to_string(j)+"#"+layer->name(), ptr[j]);
       }
     }
     perf->Inc();
     if(DisplayNow(step)){
       perf->Avg();
-      DisplayPerformance(*perf, "Train at step "+std::to_string(step));
+      DisplayPerformance(*perf, "Train");
       perf->Reset();
     }
   }
@@ -203,27 +203,22 @@ void Worker::ReceiveBlobs(shared_ptr<NeuralNet> net){
 void Worker::SendBlob(){
 }
 
-void Worker::Test(shared_ptr<NeuralNet> net, int nsteps, bool disperf){
+void Worker::Test(shared_ptr<NeuralNet> net, int nsteps, const string& prefix){
   const auto& losslayers=net->losslayers();
   Metric perf;
   for(int step=0;step<nsteps;step++){
     TestOneBatch(net, step, kTest);
-    if(disperf){
-      for(auto layer: losslayers){
-        if(layer->partitionid()==worker_id_){
-          const float * ptr=layer->metric().cpu_data();
-          for(int j=0;j<layer->metric().count();j++)
-            perf.AddMetric(layer->name()+"-"+std::to_string(j), ptr[j]);
-        }
+    for(auto layer: losslayers){
+      if(layer->partitionid()==worker_id_){
+        const float * ptr=layer->metric().cpu_data();
+        for(int j=0;j<layer->metric().count();j++)
+          perf.AddMetric(std::to_string(j)+"#"+layer->name(), ptr[j]);
       }
-      perf.Inc();
     }
+    perf.Inc();
   }
-  if(disperf){
-    perf.Avg();
-    DisplayPerformance(perf, "Test");
-    perf.Reset();
-  }
+  perf.Avg();
+  DisplayPerformance(perf, prefix);
 }
 
 /****************************BPWorker**********************************/

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a617e6c3/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 433623d..6a12ca9 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -83,7 +83,7 @@ bool ZKClusterRT::wJoinSGroup(int gid, int wid, int s_group){
 
   int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, buf, MAX_BUF_LEN);
   if (ret == ZOK){
-    LOG(ERROR) << "zookeeper node " << buf << " created";
+    LOG(INFO) << "zookeeper node " << buf << " created";
     return true;
   }
   else if (ret == ZNODEEXISTS){
@@ -105,7 +105,7 @@ bool ZKClusterRT::wLeaveSGroup(int gid, int wid, int s_group){
 
   int ret = zoo_delete(zkhandle_, path.c_str(), -1);
   if (ret == ZOK){
-    LOG(ERROR) << "zookeeper node " << path << " deleted";
+    LOG(INFO) << "zookeeper node " << path << " deleted";
     return true;
   }
   else if (ret == ZNONODE){