You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/09/23 17:07:27 UTC

[1/5] incubator-singa git commit: SINGA-21 Code review 5

Repository: incubator-singa
Updated Branches:
  refs/heads/master 79a241c8b -> 0c6e5c692


SINGA-21 Code review 5

review neuralnet.h, neuralnet.cc
 - format code
 - remove unused varaibles: data_layers_, parser_layers, loss_layers_

TODO:
 - re-implement src-dst connection function to make it clear in logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/31611759
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/31611759
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/31611759

Branch: refs/heads/master
Commit: 31611759c474826cbc4592841ad653e8aed04bc1
Parents: 79a241c
Author: wang sheng <wa...@gmail.com>
Authored: Tue Sep 22 14:06:50 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Tue Sep 22 17:28:25 2015 +0800

----------------------------------------------------------------------
 include/neuralnet/neuralnet.h |  62 +++------
 include/trainer/worker.h      |   6 +
 src/neuralnet/neuralnet.cc    | 270 ++++++++++++++++++-------------------
 3 files changed, 155 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/31611759/include/neuralnet/neuralnet.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h
index f3b091a..06fd977 100644
--- a/include/neuralnet/neuralnet.h
+++ b/include/neuralnet/neuralnet.h
@@ -22,28 +22,24 @@
 #ifndef SINGA_NEURALNET_NEURALNET_H_
 #define SINGA_NEURALNET_NEURALNET_H_
 
-#include <vector>
 #include <map>
 #include <memory>
 #include <string>
+#include <vector>
 
-#include "proto/job.pb.h"
 #include "neuralnet/layer.h"
+#include "proto/job.pb.h"
 #include "utils/factory.h"
 #include "utils/graph.h"
 
 namespace singa {
-using std::vector;
-using std::string;
-using std::map;
-using std::shared_ptr;
 
 /**
  * The neural network is constructed from user configurations in NetProto.
  *
  * Some layers, e.g., SplitLayer and BridgeSrcLayer/BridgeDstLayer
  * will be added implicitly to partition the neural network.
- * TODO create wrappers for popular models, e.g., MLP, CNN.
+ * TODO(wangwei) create wrappers for popular models, e.g., MLP, CNN.
  */
 class NeuralNet {
  public:
@@ -53,20 +49,20 @@ class NeuralNet {
    * Parameters for test/validation net can share those from training after
    * setup (done outside of this funcion).
    *
-   * @param np proto for the neural network
+   * @param net_conf proto for the neural network
    * @param phase test/training/validation
-   * @param num num of partitions, do partitioning if num > 1
+   * @param npartitions num of partitions, do partitioning if num > 1
    * @return shared pointer to a neural net
    */
-  static shared_ptr<NeuralNet> Create(const NetProto& np, Phase phase, int num);
+  static std::shared_ptr<NeuralNet> Create(const NetProto& net_conf,
+                                           Phase phase, int npartitions);
 
- public:
   /**
    * construct the net structure from protocol buffer.
    * @param netproto neural net config
    * @param npartitions num of partitions. 1 for no partitioning.
    */
-  explicit NeuralNet(NetProto netproto, int npartitions = 1);
+  NeuralNet(NetProto netproto, int npartitions);
   ~NeuralNet();
   /**
    * To display the adjacency layers
@@ -75,35 +71,16 @@ class NeuralNet {
   /**
    * Share memory of parameter values from other neuralnet
    */
-  void ShareParamsFrom(shared_ptr<NeuralNet> other);
-
-  const std::vector<Layer*>& layers() {
-    return layers_;
-  }
-  const std::vector<ParserLayer*>& parserlayers() const {
-    LOG(FATAL)<< " not implemented";
-    return parserlayers_;
-  }
-  const std::vector<LossLayer*>& losslayers() const {
-    LOG(FATAL)<< " not implemented";
-    return losslayers_;
-  }
-  const std::vector<DataLayer*>& datalayers() const {
-    LOG(FATAL)<< " not implemented";
-    return datalayers_;
-  }
-  const std::vector<Param*>& params() const {
-    return params_;
-  }
-  Layer* name2layer(string name) const {
+  void ShareParamsFrom(std::shared_ptr<NeuralNet> other);
+  inline const std::vector<Layer*>& layers() { return layers_; }
+  inline const std::vector<Param*>& params() const { return params_; }
+  inline Layer* name2layer(std::string name) const {
     if (name2layer_.find(name) != name2layer_.end())
       return name2layer_.at(name);
     else
       return nullptr;
   }
-  Param* paramid2param(int id) const {
-    return paramid2param_.at(id);
-  }
+  inline Param* paramid2param(int id) const { return paramid2param_.at(id); }
 
  protected:
   /**
@@ -126,14 +103,13 @@ class NeuralNet {
   void PrepareDataStructures();
 
  protected:
-  vector<Layer*> layers_;
-  vector<ParserLayer*> parserlayers_;
-  vector<LossLayer*> losslayers_;
-  vector<DataLayer*> datalayers_;
-  vector<Param*> params_;
+  std::vector<Layer*> layers_;
+  std::vector<Param*> params_;
 
-  map<string, Layer*> name2layer_;
-  map<int, Param*> paramid2param_;
+  std::map<std::string, Layer*> name2layer_;
+  std::map<int, Param*> paramid2param_;
 };
+
 }  // namespace singa
+
 #endif  // SINGA_NEURALNET_NEURALNET_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/31611759/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 0607385..679435c 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -26,6 +26,12 @@
 #include "communication/socket.h"
 
 namespace singa {
+
+using std::map;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+  
 //!< sleep 5 milliseconds if the Param is not updated to the expected version
 const int kCollectSleepTime=5;
 /**

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/31611759/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 2f50ec5..286d273 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -26,10 +26,14 @@
 #include "utils/singleton.h"
 
 namespace singa {
-shared_ptr<NeuralNet> NeuralNet::Create(
-    const NetProto& net_conf,
-    Phase phase,
-    int npartitions) {
+
+using std::map;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+shared_ptr<NeuralNet> NeuralNet::Create(const NetProto& net_conf, Phase phase,
+                                        int npartitions) {
   NetProto conf;
   conf.CopyFrom(net_conf);
   conf.clear_layer();
@@ -43,22 +47,21 @@ shared_ptr<NeuralNet> NeuralNet::Create(
       if (p == phase)
         include = false;
     }
-    if (include) {
-      LayerProto* layer_conf = conf.add_layer();
-      layer_conf->CopyFrom(layer);
-      // using net partition if layer partition is not set
-      if (!layer_conf->has_partition_dim())
-        layer_conf->set_partition_dim(net_conf.partition_dim());
-      for (int i = 0; i < layer_conf->param_size(); i++) {
-        ParamProto* param = layer_conf->mutable_param(i);
-        if (param->has_name() && param->name() != "") {
-          CHECK(name2param.find(param->name()) == name2param.end())
-            << "param name is repeated: " << param->name();
-          name2param[param->name()] = param;
-        }
-        if (param->has_share_from() && param->share_from() != "")
-          shares.push_back(param);
+    if (include == false) continue;
+    LayerProto* layer_conf = conf.add_layer();
+    layer_conf->CopyFrom(layer);
+    // using net partition if layer partition is not set
+    if (!layer_conf->has_partition_dim())
+      layer_conf->set_partition_dim(net_conf.partition_dim());
+    for (int i = 0; i < layer_conf->param_size(); i++) {
+      ParamProto* param = layer_conf->mutable_param(i);
+      if (param->has_name() && param->name() != "") {
+        CHECK(name2param.find(param->name()) == name2param.end())
+          << "param name is repeated: " << param->name();
+        name2param[param->name()] = param;
       }
+      if (param->has_share_from() && param->share_from() != "")
+        shares.push_back(param);
     }
   }
   for (auto param : shares) {
@@ -71,19 +74,12 @@ shared_ptr<NeuralNet> NeuralNet::Create(
     param->set_name(name);
     param->set_share_from(from);
   }
-
   LOG(INFO) << "NeuralNet config is\n" << conf.DebugString();
-
   // TODO(wangwei) create net based on net type, e.g., directed, undirected, etc
   auto net = std::make_shared<NeuralNet>(conf, npartitions);
   return net;
 }
 
-NeuralNet::~NeuralNet() {
-  for (auto layer : layers_)
-    delete layer;
-}
-
 NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
   LOG(INFO) << "Constructing Neural Net...";
   auto graph = CreateGraph(netproto, npartitions);
@@ -95,82 +91,34 @@ NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
   LOG(INFO) << "Neural net constructed";
 }
 
-void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) {
-  // create one layer per node
-  for (Node* node : graph->nodes()) {
-    auto proto_ptr =  static_cast<LayerProto*>(node->proto);
-    auto layer = Layer::Create(*proto_ptr);
-    layers_.push_back(layer);
-    name2layer_[node->name] = layer;
-  }
-  // connect layers
-  for (Node* node : graph->nodes()) {
-    auto layer = name2layer_[node->name];
-    layer->clear_dstlayers();
-    for (Node* dst : node->dstnodes)
-      layer->add_dstlayer(name2layer_[dst->name]);
-    layer->clear_srclayers();
-    for (Node* src : node->srcnodes)
-      layer->add_srclayer(name2layer_[src->name]);
-  }
-  // setup layers
-  int paramid = 0;
-  map<string, string> layerinfo;
-  map<string, vector<Layer*>> share_param_layers;
-  for (Node* node : graph->nodes()) {
-    auto layer = name2layer_[node->name];
-    layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions);
-    LOG(INFO) << "constructing graph: " << layer->name();
-    layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape());
-    string param_name = "$";
-    for (auto param : layer->GetParams()) {
-      param->set_id(paramid++);
-      // if user does not name the param, then name it based on layer name.
-      if (param->name() == "") {
-        param->set_name(layer->name() + param_name);
-        param_name += "$";
-      }
-    }
-    if (layer->partition_dim() == 0)
-      share_param_layers[node->origin].push_back(layer);
-  }
-  LOG(INFO) << "Neural net structure\n"  << graph->ToJson(layerinfo);
+NeuralNet::~NeuralNet() {
+  for (auto layer : layers_)
+    delete layer;
+}
 
-  // create map from param name to param ptr
-  std::unordered_map<string, Param*> name2param;
-  for (auto layer : layers_) {
-    for (auto param : layer->GetParams()) {
-      name2param[param->name()] = param;
-    }
-  }
-  for (auto & entry : share_param_layers) {
-    // overwrite entries for replicated params due to layer partition (dim 0).
-    for (auto *param : entry.second.front()->GetParams())
-      name2param.at(param->name()) = param;
+std::string NeuralNet::ToAdjacency() {
+  string disp = "";
+  for (auto& layer : layers_) {
+    disp += layer->name()+": ";
+    for (const auto& dst : layer->dstlayers())
+      disp += dst->name()+", ";
+    disp += "\n";
   }
-  // share params based on share_from field
-  for (auto & entry : name2param) {
-    Param* param = entry.second;
-    const string share_from = param->share_from();
-    if (param->share_from() != "") {
-      if(name2param.find(share_from) != name2param.end()) {
-        param->ShareFrom(*name2param.at(param->share_from()));
-      } else {
-        LOG(FATAL) << "No param with the name (share_from) " << share_from;
+  return disp;
+}
+
+void NeuralNet::ShareParamsFrom(shared_ptr<NeuralNet> other) {
+  for (auto& layer : layers_) {
+    auto otherlayer = other->name2layer(layer->name());
+    if (otherlayer != nullptr) {
+      const auto& otherparams = otherlayer->GetParams();
+      const auto& params = layer->GetParams();
+      CHECK_EQ(params.size(), otherparams.size());
+      for (size_t i = 0; i < params.size(); i++) {
+        params[i]->ShareFrom(*otherparams[i]);
       }
     }
   }
-  // share Params for layers generated (partitioned) from the same origin layer
-  for (auto & entry : share_param_layers) {
-    const auto& owner = entry.second.begin();
-    const auto& owner_params = (*owner)->GetParams();
-    for (auto it = owner + 1; it != entry.second.end(); it++) {
-      auto params = (*it)->GetParams();
-      CHECK_EQ(params.size(), owner_params.size());
-      for (size_t i = 0; i < params.size(); i++)
-        params.at(i)->ShareFrom(*owner_params.at(i));
-    }
-  }
 }
 
 // add a node for SliceLayer between srcnode and dstnodes
@@ -314,23 +262,26 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
         for (Node* node : nodes)
           ConcateNodes(graph, srcnodes, node);
       } else if ((src_pdim == 1 && pdim == 0) || (src_pdim == 0 && pdim == 1)) {
+        // TODO(wangwei) rewrite the whole src-dst construction in a clear way
+        LOG(FATAL) << "not implemented";
         // the most complext scenario
-        vector<Node*> nodes;
-        for (Node* srcnode : srcnodes)
-          nodes.push_back(SliceNode(graph, srcnode, nodes, false));
-        for (Node* node : nodes)
-          ConcateNodes(graph, nodes, node);
+        // vector<Node*> nodes;
+        // for (Node* srcnode : srcnodes)
+        //   nodes.push_back(SliceNode(graph, srcnode, nodes, false));
+        // for (Node* node : nodes)
+        //   ConcateNodes(graph, nodes, node);
       } else if ((src_pdim == 0 && pdim == 0)||
           (src_pdim == 1 && pdim == 1 && connection == kOneToOne)) {
         CHECK_EQ(srcnodes.size(), nodes.size());
         for (size_t i = 0; i < srcnodes.size(); i++)
           graph->AddEdge(srcnodes[i], nodes[i]);
+      } else {
+        LOG(FATAL) << "in wrong branch, not implemented";
       }
     }
   }
   // must do topology sort, because we have added new nodes.
   graph->Sort();
-
   // add nodes for SplitLayer
   vector<Node*> oldnodes = graph->nodes();
   for (Node* node : oldnodes) {
@@ -344,7 +295,6 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
     }
     delete layer;
   }
-
   // add nodes for bridge layers
   for (Node* node : oldnodes) {
     vector<Node*> dstnodes = node->dstnodes;
@@ -363,54 +313,94 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
   return graph;
 }
 
+void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) {
+  // create one layer per node
+  for (Node* node : graph->nodes()) {
+    auto proto_ptr = static_cast<LayerProto*>(node->proto);
+    auto layer = Layer::Create(*proto_ptr);
+    layers_.push_back(layer);
+    name2layer_[node->name] = layer;
+  }
+  // connect layers
+  for (Node* node : graph->nodes()) {
+    auto layer = name2layer_[node->name];
+    layer->clear_dstlayers();
+    for (Node* dst : node->dstnodes)
+      layer->add_dstlayer(name2layer_[dst->name]);
+    layer->clear_srclayers();
+    for (Node* src : node->srcnodes)
+      layer->add_srclayer(name2layer_[src->name]);
+  }
+  // setup layers
+  int paramid = 0;
+  map<string, string> layerinfo;
+  map<string, vector<Layer*>> share_param_layers;
+  for (Node* node : graph->nodes()) {
+    auto layer = name2layer_[node->name];
+    layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions);
+    LOG(INFO) << "constructing graph: " << layer->name();
+    layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape());
+    string param_name = "$";
+    for (auto param : layer->GetParams()) {
+      param->set_id(paramid++);
+      // if user does not name the param, then name it based on layer name.
+      if (param->name() == "") {
+        param->set_name(layer->name() + param_name);
+        param_name += "$";
+      }
+    }
+    if (layer->partition_dim() == 0)
+      share_param_layers[node->origin].push_back(layer);
+  }
+  LOG(INFO) << "Neural net structure\n"  << graph->ToJson(layerinfo);
+  // create map from param name to param ptr
+  std::unordered_map<string, Param*> name2param;
+  for (auto layer : layers_) {
+    for (auto param : layer->GetParams()) {
+      name2param[param->name()] = param;
+    }
+  }
+  for (auto & entry : share_param_layers) {
+    // overwrite entries for replicated params due to layer partition (dim 0).
+    for (auto *param : entry.second.front()->GetParams())
+      name2param.at(param->name()) = param;
+  }
+  // share params based on share_from field
+  for (auto & entry : name2param) {
+    Param* param = entry.second;
+    const string share_from = param->share_from();
+    if (param->share_from() != "") {
+      if (name2param.find(share_from) != name2param.end()) {
+        param->ShareFrom(*name2param.at(param->share_from()));
+      } else {
+        LOG(FATAL) << "No param with the name (share_from) " << share_from;
+      }
+    }
+  }
+  // share Params for layers generated (partitioned) from the same origin layer
+  for (auto & entry : share_param_layers) {
+    const auto& owner = entry.second.begin();
+    const auto& owner_params = (*owner)->GetParams();
+    for (auto it = owner + 1; it != entry.second.end(); it++) {
+      auto params = (*it)->GetParams();
+      CHECK_EQ(params.size(), owner_params.size());
+      for (size_t i = 0; i < params.size(); i++)
+        params.at(i)->ShareFrom(*owner_params.at(i));
+    }
+  }
+}
 
 void NeuralNet::PrepareDataStructures() {
-  parserlayers_.clear();
-  losslayers_.clear();
-  datalayers_.clear();
   params_.clear();
   paramid2param_.clear();
   name2layer_.clear();
-
   for (auto& layer : layers_) {
     name2layer_[layer->name()] = layer;
-    /*
-    if (layer->is_parserlayer())
-      parserlayers_.push_back(static_cast<ParserLayer*>(layer));
-    if (layer->is_losslayer())
-      losslayers_.push_back(static_cast<LossLayer*>(layer));
-    if (layer->is_datalayer())
-      datalayers_.push_back(static_cast<DataLayer*>(layer));
-      */
     for (Param* p : layer->GetParams()) {
       paramid2param_[p->id()] = p;
       params_.push_back(p);
     }
   }
 }
-std::string NeuralNet::ToAdjacency() {
-  string disp = "";
-  for (auto& layer : layers_) {
-    disp += layer->name()+": ";
-    for (const auto& dst : layer->dstlayers())
-      disp += dst->name()+", ";
-    disp += "\n";
-  }
-  return disp;
-}
-
-void NeuralNet::ShareParamsFrom(shared_ptr<NeuralNet> other) {
-  for (auto& layer : layers_) {
-    auto otherlayer = other->name2layer(layer->name());
-    if (otherlayer != nullptr) {
-      const auto& otherparams = otherlayer->GetParams();
-      const auto& params = layer->GetParams();
-      CHECK_EQ(params.size(), otherparams.size());
-      for (size_t i = 0; i < params.size(); i++) {
-        params[i]->ShareFrom(*otherparams[i]);
-      }
-    }
-  }
-}
 
 }  // namespace singa


[4/5] incubator-singa git commit: SINGA-21 Code review 5

Posted by wa...@apache.org.
SINGA-21 Code review 5

review trainer.cc/h, driver.cc/.h, singa.h, main.cc
 - rewrite headers in driver.h
 - move template impl from driver.h to driver.cc
 - format code


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/366e6a82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/366e6a82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/366e6a82

Branch: refs/heads/master
Commit: 366e6a82684aff9c0b31e904e3c45dcca2163490
Parents: f50d293
Author: wang sheng <wa...@gmail.com>
Authored: Wed Sep 23 15:20:20 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Wed Sep 23 15:28:43 2015 +0800

----------------------------------------------------------------------
 include/driver.h           |  45 +-------
 include/trainer/trainer.h  |  80 ++++++-------
 src/driver.cc              |  52 ++++++++-
 src/main.cc                |  10 +-
 src/neuralnet/neuralnet.cc |   4 +-
 src/trainer/trainer.cc     | 250 +++++++++++++++++++---------------------
 6 files changed, 211 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/366e6a82/include/driver.h
----------------------------------------------------------------------
diff --git a/include/driver.h b/include/driver.h
index 7d15c98..563be77 100644
--- a/include/driver.h
+++ b/include/driver.h
@@ -22,7 +22,8 @@
 #ifndef SINGA_DRIVER_H_
 #define SINGA_DRIVER_H_
 
-#include "singa.h"
+#include "proto/job.pb.h"
+#include "proto/singa.pb.h"
 
 namespace singa {
 
@@ -119,48 +120,6 @@ class Driver {
   SingaProto singa_conf_;
 };
 
-template<typename Subclass, typename Type>
-int Driver::RegisterLayer(const Type& type) {
-  auto factory = Singleton<Factory<singa::Layer>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Layer));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterParam(const Type& type) {
-  auto factory = Singleton<Factory<singa::Param>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Param));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterParamGenerator(const Type& type) {
-  auto factory = Singleton<Factory<singa::ParamGenerator>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, ParamGenerator));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterUpdater(const Type& type) {
-  auto factory = Singleton<Factory<singa::Updater>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Updater));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterLRGenerator(const Type& type) {
-  auto factory = Singleton<Factory<singa::LRGenerator>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, LRGenerator));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterWorker(const Type& type) {
-  auto factory = Singleton<Factory<singa::Worker>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Worker));
-  return 1;
-}
-
 }  // namespace singa
 
 #endif  // SINGA_DRIVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/366e6a82/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index d3d332f..1c0e039 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -19,26 +19,24 @@
 *
 *************************************************************/
 
-#ifndef INCLUDE_TRAINER_TRAINER_H_
-#define INCLUDE_TRAINER_TRAINER_H_
+#ifndef SINGA_TRAINER_TRAINER_H_
+#define SINGA_TRAINER_TRAINER_H_
 
 #include <queue>
-#include <vector>
 #include <unordered_map>
+#include <vector>
+#include "communication/socket.h"
+#include "neuralnet/neuralnet.h"
 #include "proto/job.pb.h"
 #include "proto/singa.pb.h"
+#include "trainer/server.h"
+#include "trainer/worker.h"
+#include "utils/factory.h"
 #include "utils/param.h"
 #include "utils/singleton.h"
-#include "utils/factory.h"
-#include "neuralnet/neuralnet.h"
-#include "trainer/worker.h"
-#include "trainer/server.h"
-#include "communication/socket.h"
 
 namespace singa {
 
-using std::vector;
-  
 /**
  * Every running process has a training object which launches one or more
  * worker (and server) threads.
@@ -77,7 +75,7 @@ class Trainer{
    * @param jobConf
    * @return server instances
    */
-  vector<Server*> CreateServers(const JobProto& jobConf);
+  std::vector<Server*> CreateServers(const JobProto& jobConf);
   /**
    * Create workers instances.
    * @param nthread total num of threads in current procs which is used to
@@ -86,8 +84,7 @@ class Trainer{
    * @param jobConf
    * @return worker instances
    */
-  vector<Worker*> CreateWorkers(const JobProto& jobConf);
-
+  std::vector<Worker*> CreateWorkers(const JobProto& jobConf);
   /**
    * Setup workers and servers.
    *
@@ -98,12 +95,11 @@ class Trainer{
    * @param workers
    * @param servers
    */
-  void SetupWorkerServer(
-    const JobProto& jobConf,
-    const vector<Worker*>& workers,
-    const vector<Server*>& servers);
-
-  void Run(const vector<Worker*>& workers, const vector<Server*>& servers);
+  void SetupWorkerServer(const JobProto& jobConf,
+                         const std::vector<Worker*>& workers,
+                         const std::vector<Server*>& servers);
+  void Run(const std::vector<Worker*>& workers,
+           const std::vector<Server*>& servers);
   /**
    * Display metrics to log (standard output)
    */
@@ -118,24 +114,20 @@ class Trainer{
    * Handle messages to local servers and local stub
    */
   void HandleLocalMsg(std::queue<Msg*>* msg_queue, Msg** msg);
-
-	/**
-	 * Generate a request message to Get the parameter object.
-	 */
-	const vector<Msg*> HandleGet(ParamEntry* entry, Msg** msg);
-	void HandleGetResponse(ParamEntry* entry, Msg** msg);
-
-	/**
-	 * Generate a request message to Update the parameter object.
-	 */
-	const vector<Msg*> HandleUpdate(ParamEntry* entry, Msg** msg);
+  /**
+   * Generate a request message to Get the parameter object.
+   */
+  const std::vector<Msg*> HandleGet(ParamEntry* entry, Msg** msg);
+  void HandleGetResponse(ParamEntry* entry, Msg** msg);
+  /**
+   * Generate a request message to Update the parameter object.
+   */
+  const std::vector<Msg*> HandleUpdate(ParamEntry* entry, Msg** msg);
   void HandleUpdateResponse(ParamEntry* entry, Msg** msg);
-
   /**
-	 * Generate a request message to Put the parameter object.
-	 */
-	const vector<Msg*> HandlePut(ParamEntry* entry, Msg** msg);
-
+   * Generate a request message to Put the parameter object.
+   */
+  const std::vector<Msg*> HandlePut(ParamEntry* entry, Msg** msg);
   /**
    * Called by HandlePut, HandleUpdate and HandleGet functions
    * @param type message type
@@ -145,7 +137,7 @@ class Trainer{
    * @param ret generated messages
    */
   void GenMsgs(int type, int version, ParamEntry* entry,
-    Msg* msg, vector<Msg*> *ret);
+    Msg* msg, std::vector<Msg*> *ret);
   /**
    * Get a hash id for a Param object from a group.
    *
@@ -157,13 +149,15 @@ class Trainer{
   }
 
  protected:
-  int procs_id_;
-  Router *router_;
+  int procs_id_ = -1;
+  Router *router_ = nullptr;
   std::unordered_map<int, ParamEntry*> worker_shard_;
   //!< map from slice to the server that updates it
-  vector<int> slice2server_;
-  //stub will destroy all neuralnets in the end
-  vector<NeuralNet*> nets_;
+  std::vector<int> slice2server_;
+  // a buffer of created nets, will destroy them all in destructor
+  std::vector<NeuralNet*> nets_;
 };
-} /* singa */
-#endif // INCLUDE_TRAINER_TRAINER_H_
+
+}  // namespace singa
+
+#endif  // SINGA_TRAINER_TRAINER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/366e6a82/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 28d21c2..42a1330 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -24,24 +24,27 @@
 #include <cblas.h>
 #include <glog/logging.h>
 #include <string>
+#include "neuralnet/neuralnet.h"
+#include "neuralnet/layer.h"
+#include "trainer/trainer.h"
+#include "utils/common.h"
+#include "utils/factory.h"
+#include "utils/singleton.h"
 #include "utils/tinydir.h"
 
 namespace singa {
 
 void Driver::Init(int argc, char **argv) {
   google::InitGoogleLogging(argv[0]);
-
   //  unique job ID generated from singa-run.sh, passed in as "-singa_job <id>"
   int arg_pos = ArgPos(argc, argv, "-singa_job");
   job_id_ = (arg_pos != -1) ? atoi(argv[arg_pos+1]) : -1;
-
   //  global signa conf passed by singa-run.sh as "-singa_conf <path>"
   arg_pos = ArgPos(argc, argv, "-singa_conf");
   if (arg_pos != -1)
     ReadProtoFromTextFile(argv[arg_pos+1], &singa_conf_);
   else
     ReadProtoFromTextFile("conf/singa.conf", &singa_conf_);
-
   //  job conf passed by users as "-conf <path>"
   arg_pos = ArgPos(argc, argv, "-conf");
   CHECK_NE(arg_pos, -1);
@@ -107,7 +110,47 @@ void Driver::Init(int argc, char **argv) {
   RegisterParamGenerator<UniformSqrtFanInOutGen>(kUniformSqrtFanInOut);
 }
 
+template<typename Subclass, typename Type>
+int Driver::RegisterLayer(const Type& type) {
+  auto factory = Singleton<Factory<singa::Layer>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Layer));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterParam(const Type& type) {
+  auto factory = Singleton<Factory<singa::Param>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Param));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterParamGenerator(const Type& type) {
+  auto factory = Singleton<Factory<singa::ParamGenerator>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, ParamGenerator));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterUpdater(const Type& type) {
+  auto factory = Singleton<Factory<singa::Updater>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Updater));
+  return 1;
+}
 
+template<typename Subclass, typename Type>
+int Driver::RegisterLRGenerator(const Type& type) {
+  auto factory = Singleton<Factory<singa::LRGenerator>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, LRGenerator));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterWorker(const Type& type) {
+  auto factory = Singleton<Factory<singa::Worker>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Worker));
+  return 1;
+}
 
 void Driver::Submit(bool resume, const JobProto& jobConf) {
   if (singa_conf_.has_log_dir())
@@ -118,9 +161,8 @@ void Driver::Submit(bool resume, const JobProto& jobConf) {
     LOG(FATAL) << "workspace does not exist: " << jobConf.cluster().workspace();
   if (jobConf.num_openblas_threads() != 1)
     LOG(WARNING) << "openblas with "
-      << jobConf.num_openblas_threads() << " threads";
+                 << jobConf.num_openblas_threads() << " threads";
   openblas_set_num_threads(jobConf.num_openblas_threads());
-
   JobProto job;
   job.CopyFrom(jobConf);
   job.set_id(job_id_);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/366e6a82/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
index 5e94de4..5d2ab2f 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -45,20 +45,20 @@
  */
 
 int main(int argc, char **argv) {
-  //  must create driver at the beginning and call its Init method.
+  // must create driver at the beginning and call its Init method.
   singa::Driver driver;
   driver.Init(argc, argv);
 
-  //  if -resume in argument list, set resume to true; otherwise false
+  // if -resume in argument list, set resume to true; otherwise false
   int resume_pos = singa::ArgPos(argc, argv, "-resume");
   bool resume = (resume_pos != -1);
 
-  //  users can register new subclasses of layer, updater, etc.
+  // users can register new subclasses of layer, updater, etc.
 
-  //  get the job conf, and custmize it if need
+  // get the job conf, and custmize it if need
   singa::JobProto jobConf = driver.job_conf();
 
-  //  submit the job
+  // submit the job
   driver.Submit(resume, jobConf);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/366e6a82/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 200824a..775a5a7 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -19,10 +19,10 @@
 *
 *************************************************************/
 
+#include "neuralnet/neuralnet.h"
+
 #include <algorithm>
 #include <queue>
-
-#include "neuralnet/neuralnet.h"
 #include "utils/singleton.h"
 
 namespace singa {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/366e6a82/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 8a0589e..ecfc94a 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -19,25 +19,21 @@
 *
 *************************************************************/
 
-#include <thread>
-#include <vector>
-#include <map>
-#include <chrono>
+#include "trainer/trainer.h"
+
 #include <glog/logging.h>
-#include "utils/tinydir.h"
 #include <unistd.h>
+#include <map>
+#include <thread>
+#include "mshadow/tensor.h"
+#include "proto/common.pb.h"
 #include "utils/cluster.h"
 #include "utils/common.h"
-#include "proto/common.pb.h"
-#include "trainer/trainer.h"
-#include "mshadow/tensor.h"
-
+#include "utils/tinydir.h"
 
 namespace singa {
+
 using std::vector;
-using std::map;
-using std::queue;
-using namespace std::chrono;
 using std::string;
 
 /***********************Trainer****************************/
@@ -47,12 +43,82 @@ Trainer::~Trainer() {
     delete p;
 }
 
+void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
+  // register job to zookeeper at the beginning
+  auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
+  if (resume) Resume(job);
+  router_ = new Router();
+  router_->Bind(kInprocRouterEndpoint);
+  const string hostip = cluster->hostip();
+  int port = router_->Bind("tcp://" + hostip + ":*");
+  // register endpoint to zookeeper
+  cluster->Register(getpid(), hostip + ":" + std::to_string(port));
+  const vector<Worker*> workers = CreateWorkers(*job);
+  const vector<Server*> servers = CreateServers(*job);
+  SetupWorkerServer(*job, workers, servers);
+#ifdef USE_MPI
+  int nthreads = workers.size() + servers.size();
+  for (int i = 0; i < nthreads; i++)
+    MPIQueues.push_back(make_shared<SafeQueue>());
+#endif
+  vector<std::thread> threads;
+  for (auto server : servers)
+    threads.push_back(std::thread(&Server::Run, server));
+  for (auto worker : workers)
+    threads.push_back(std::thread(&Worker::Run, worker));
+  Run(workers, servers);
+  for (auto& thread : threads)
+    thread.join();
+  for (auto server : servers)
+    delete server;
+  for (auto worker : workers)
+    delete worker;
+}
+
+void Trainer::Resume(JobProto* jobConf) {
+  tinydir_dir dir;
+  string folder = Cluster::Get()->checkpoint_folder();
+  tinydir_open(&dir, folder.c_str());
+  int latest_step = 0;
+  // there would be multi checkpoint files (from diff workers) for one step
+  vector<string> ck_files;
+  // iterate all files to get the files for the last checkpoint
+  while (dir.has_next) {
+    tinydir_file file;
+    tinydir_readfile(&dir, &file);
+    tinydir_next(&dir);
+    char* ch = strstr(file.name, "step");
+    if (ch == nullptr) {
+      if (file.name[0] != '.')
+        LOG(INFO) << "Irregular file in checkpoint folder: " << file.name;
+      continue;
+    }
+    LOG(INFO) << "Add checkpoint file for resume: " << ch;
+    int step = atoi(ch+4);
+    if (step == latest_step) {
+      ck_files.push_back(file.name);
+    } else if (step > latest_step) {
+      latest_step = step;
+      ck_files.clear();
+      ck_files.push_back(string(file.name));
+    }
+  }
+  if (latest_step > 0) {
+    jobConf->set_step(latest_step);
+    if (!jobConf->has_reset_param_version())
+      jobConf->set_reset_param_version(false);
+    jobConf->clear_checkpoint_path();
+    for (auto ck_file : ck_files)
+      jobConf->add_checkpoint_path(folder + "/" + ck_file);
+  }
+  tinydir_close(&dir);
+}
+
 const vector<int> SliceParams(const vector<Param*>& params) {
   // for load-balance among servers in a group and among server groups
   int nserver_grps = Cluster::Get()->nserver_groups();
   int nservers_per_grp = Cluster::Get()->nservers_per_group();
   int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp);
-
   // collect sizes of unique Params
   std::vector<int> paramsize;
   for (auto param : params)
@@ -86,10 +152,9 @@ const vector<int> SliceParams(const vector<Param*>& params) {
   return slices;
 }
 
-void Trainer::SetupWorkerServer(
-    const JobProto& job_conf,
-    const vector<Worker*>& workers,
-    const vector<Server*>& servers) {
+void Trainer::SetupWorkerServer(const JobProto& job_conf,
+                                const vector<Worker*>& workers,
+                                const vector<Server*>& servers) {
   auto cluster = Cluster::Get();
   int grp_size = cluster->nworkers_per_group();
   const auto& net_conf = job_conf.neuralnet();
@@ -97,7 +162,6 @@ void Trainer::SetupWorkerServer(
   nets_.push_back(net);
   // MUST do SliceParam before share param/net with others
   auto slices = SliceParams(net->params());
-
   std::unordered_map<int, NeuralNet*> grp_net;
   int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
   for (auto worker : workers) {
@@ -107,13 +171,17 @@ void Trainer::SetupWorkerServer(
     NeuralNet* valid_net = nullptr;
     if (grp_net.find(grp_id) == grp_net.end()) {
       if (grp_id == first_grp) {
-        //  test are performed only by the first group now. TODO update.
+        // test are performed only by the first group now.
+        // TODO(wangwei) update.
         if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
-          test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp
+          // hard code for exp
+          // TODO(wangwei) move test unit out as an independent module
+          test_net = NeuralNet::Create(net_conf, kTest, 1);
           test_net->ShareParamsFrom(net);
           nets_.push_back(test_net);
         }
-        //  validation are performed only by the first group. TODO update.
+        // validation are performed only by the first group.
+        // TODO(wangwei) update.
         if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
           valid_net = NeuralNet::Create(net_conf, kValidation, 1);
           valid_net->ShareParamsFrom(net);
@@ -123,7 +191,7 @@ void Trainer::SetupWorkerServer(
       } else {
         grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
         nets_.push_back(grp_net[grp_id]);
-        if(cluster->share_memory())
+        if (cluster->share_memory())
           grp_net[grp_id]->ShareParamsFrom(net);
       }
       for (auto layer : grp_net[grp_id]->layers()) {
@@ -141,12 +209,10 @@ void Trainer::SetupWorkerServer(
               << worker->id() << " net " << grp_net[grp_id];
     worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
   }
-
   //  partition among server groups, each group maintains one sub-set for sync
   auto slice2group = PartitionSlices(cluster->nserver_groups(), slices);
   //  partition within one server group, each server updates for one sub-set
   slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices);
-
   for (auto server : servers)
     server->Setup(job_conf.updater(), slice2group, slice2server_);
 }
@@ -156,14 +222,13 @@ vector<Server*> Trainer::CreateServers(const JobProto& job) {
   vector<Server*> servers;
   if (!cluster->has_server())
     return servers;
-
   int server_procs = cluster->procs_id();
   // if true, server procs (logical) id starts after worker procs
   if (cluster->server_worker_separate())
     server_procs -= cluster->nworker_procs();
   const vector<int> rng = cluster->ExecutorRng(server_procs,
-      cluster->nservers_per_group(),
-      cluster->nservers_per_procs());
+                                               cluster->nservers_per_group(),
+                                               cluster->nservers_per_procs());
   int gstart = rng[0], gend = rng[1], start = rng[2], end = rng[3];
   for (int gid = gstart; gid < gend; gid++) {
     for (int sid = start; sid < end; sid++) {
@@ -174,15 +239,14 @@ vector<Server*> Trainer::CreateServers(const JobProto& job) {
   return servers;
 }
 
-
 vector<Worker*> Trainer::CreateWorkers(const JobProto& job) {
-  auto cluster=Cluster::Get();
+  auto cluster = Cluster::Get();
   vector<Worker*> workers;
-  if(!cluster->has_worker())
+  if (!cluster->has_worker())
     return workers;
   const vector<int> rng = cluster->ExecutorRng(cluster->procs_id(),
-      cluster->nworkers_per_group(),
-      cluster->nworkers_per_procs());
+                                               cluster->nworkers_per_group(),
+                                               cluster->nworkers_per_procs());
   int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3];
   for (int gid = gstart; gid < gend; gid++) {
     for (int wid = wstart; wid < wend; wid++) {
@@ -194,93 +258,13 @@ vector<Worker*> Trainer::CreateWorkers(const JobProto& job) {
   return workers;
 }
 
-void Trainer::Resume(JobProto* jobConf) {
-  tinydir_dir dir;
-  string folder = Cluster::Get()->checkpoint_folder();
-  tinydir_open(&dir, folder.c_str());
-  int latest_step = 0;
-  // there would be multi checkpoint files (from diff workers) for one step
-  vector<string> ck_files;
-  // iterate all files to get the files for the last checkpoint
-  while (dir.has_next) {
-    tinydir_file file;
-    tinydir_readfile(&dir, &file);
-    tinydir_next(&dir);
-    char* ch = strstr(file.name, "step");
-    if (ch == nullptr) {
-      if (file.name[0] != '.')
-        LOG(INFO) << "Irregular file in checkpoint folder: " << file.name;
-      continue;
-    }
-
-    LOG(INFO) << "Add checkpoint file for resume: " << ch;
-    int step = atoi(ch+4);
-    if (step == latest_step) {
-      ck_files.push_back(file.name);
-    } else if(step > latest_step) {
-      latest_step = step;
-      ck_files.clear();
-      ck_files.push_back(string(file.name));
-    }
-  }
-
-  if (latest_step > 0) {
-    jobConf->set_step(latest_step);
-    if (!jobConf->has_reset_param_version())
-      jobConf->set_reset_param_version(false);
-    jobConf->clear_checkpoint_path();
-    for (auto ck_file : ck_files)
-      jobConf->add_checkpoint_path(folder + "/" + ck_file);
-  }
-  tinydir_close(&dir);
-}
-
-void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
-  // register job to zookeeper at the beginning
-  auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
-  if (resume)
-    Resume(job);
-
-  router_ = new Router();
-  router_->Bind(kInprocRouterEndpoint);
-  const string hostip = cluster->hostip();
-  int port = router_->Bind("tcp://" + hostip + ":*");
-  // register endpoint to zookeeper
-  cluster->Register(getpid(), hostip + ":" + std::to_string(port));
-
-  const vector<Worker*> workers = CreateWorkers(*job);
-  const vector<Server*> servers = CreateServers(*job);
-  SetupWorkerServer(*job, workers, servers);
-
-#ifdef USE_MPI
-  int nthreads = workers.size() + servers.size();
-  for (int i = 0; i < nthreads; i++)
-    MPIQueues.push_back(make_shared<SafeQueue>());
-#endif
-  vector<std::thread> threads;
-  for(auto server : servers)
-    threads.push_back(std::thread(&Server::Run, server));
-  for(auto worker : workers)
-    threads.push_back(std::thread(&Worker::Run, worker));
-  Run(workers, servers);
-  for(auto& thread : threads)
-    thread.join();
-  for(auto server : servers)
-    delete server;
-  for(auto worker : workers)
-    delete worker;
-}
-
-void Trainer::Run(
-    const vector<Worker*>& workers,
-    const vector<Server*>& servers) {
+void Trainer::Run(const vector<Worker*>& workers,
+                  const vector<Server*>& servers) {
   int nworkers = workers.size(), nservers = servers.size();
   auto cluster = Cluster::Get();
   procs_id_ = cluster->procs_id();
   LOG(INFO) << "Stub in process " << procs_id_ << " starts";
-
-  map<int, Dealer*> inter_dealers;  // for sending msg to other procs
-
+  std::map<int, Dealer*> inter_dealers;  // for sending msg to other procs
   std::queue<Msg*> msg_queue;
   while (true) {
     Msg* msg = nullptr;
@@ -343,26 +327,27 @@ Dealer* Trainer::CreateInterProcsDealer(int dst_procs) {
   // forward to other procs
   auto cluster = Cluster::Get();
   auto dealer = new Dealer();
-  while(cluster->endpoint(dst_procs)=="") {
-    //kCollectSleepTime));
+  while (cluster->endpoint(dst_procs) == "") {
+    // kCollectSleepTime));
     std::this_thread::sleep_for(std::chrono::milliseconds(3000));
-    LOG(ERROR)<<"waiting for procs "<< dst_procs<<" to register";
+    LOG(ERROR) << "waiting for procs " << dst_procs << " to register";
   }
   dealer->Connect("tcp://"+cluster->endpoint(dst_procs));
   return dealer;
 }
 
-void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) {
+void Trainer::HandleLocalMsg(std::queue<Msg*>* msg_queue, Msg** msg) {
   Msg* msgg = *msg;
   int paramid = ParamID(msgg->trgt_val());
   int type = msgg->type();
   int grp;
   ParamEntry *entry = nullptr;
-  switch (type) {  // TODO process other requests, e.g. RESTful
+  // TODO(wangwei) process other requests, e.g. RESTful
+  switch (type) {
     case kUpdate:
       grp = AddrGrp(msgg->src());
       entry = worker_shard_.at(Hash(grp, paramid));
-      for(auto update_msg : HandleUpdate(entry, msg))
+      for (auto update_msg : HandleUpdate(entry, msg))
         msg_queue->push(update_msg);
       break;
     case kRUpdate:
@@ -373,7 +358,7 @@ void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) {
     case kGet:
       grp = AddrGrp(msgg->src());
       entry = worker_shard_.at(Hash(grp, paramid));
-      for(auto get_msg : HandleGet(entry, msg))
+      for (auto get_msg : HandleGet(entry, msg))
         msg_queue->push(get_msg);
       break;
     case kRGet:
@@ -384,22 +369,22 @@ void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) {
     case kPut:
       grp = AddrGrp(msgg->src());
       entry = worker_shard_.at(Hash(grp, paramid));
-      for(auto put_msg : HandlePut(entry, msg))
+      for (auto put_msg : HandlePut(entry, msg))
         msg_queue->push(put_msg);
       break;
     default:
-      LOG(ERROR)<<"Unknow message type:"<<type;
+      LOG(ERROR) << "Unknow message type:" << type;
       break;
   }
 }
 
-void Trainer::GenMsgs(int type, int version, ParamEntry* entry,
-    Msg* msg, vector<Msg*> *ret) {
+void Trainer::GenMsgs(int type, int version, ParamEntry* entry, Msg* msg,
+                      vector<Msg*> *ret) {
   int src_grp = AddrGrp(msg->src());
   int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group();
-  auto param=entry->shares.at(0);
+  auto param = entry->shares.at(0);
   for (int idx = 0 ; idx < param->num_slices(); idx++) {
-    int slice_id =param->slice_start() + idx;
+    int slice_id = param->slice_start() + idx;
     int server = slice2server_[slice_id];
     int dst_procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer);
     Msg* new_msg = nullptr;
@@ -440,10 +425,10 @@ const vector<Msg*> Trainer::HandleUpdate(ParamEntry *entry, Msg** msg) {
     // average local gradient
     if (entry->num_local > 1) {
       auto it = entry->shares.begin();
-      auto shape=mshadow::Shape1((*it)->size());
-      mshadow::Tensor<mshadow::cpu,1> sum((*it)->mutable_cpu_grad(), shape);
+      auto shape = mshadow::Shape1((*it)->size());
+      mshadow::Tensor<mshadow::cpu, 1> sum((*it)->mutable_cpu_grad(), shape);
       for (++it; it != entry->shares.end(); it++) {
-        mshadow::Tensor<mshadow::cpu,1> grad((*it)->mutable_cpu_grad(), shape);
+        mshadow::Tensor<mshadow::cpu, 1> grad((*it)->mutable_cpu_grad(), shape);
         sum += grad;
       }
     }
@@ -480,4 +465,5 @@ void Trainer::HandleUpdateResponse(ParamEntry* entry, Msg** msg) {
     param->set_version(version);
   DeleteMsg(msg);
 }
-} /* singa */
+
+}  // namespace singa


[3/5] incubator-singa git commit: SINGA-21 Code review 5

Posted by wa...@apache.org.
SINGA-21 Code review 5

review worker.h, worker.cc
 - format code
 - change shared_ptr to raw ptr for neuralnet object
    all neuralnet object will be managed by trainer
    trainer passes pointers to workers and releases them in destructor

others
 - remove virtual keyword in Server class (server.h)


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f50d293f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f50d293f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f50d293f

Branch: refs/heads/master
Commit: f50d293ff550d5b8ccace9f7e6992865474d0d29
Parents: d3e1fca
Author: wang sheng <wa...@gmail.com>
Authored: Wed Sep 23 13:15:42 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Wed Sep 23 13:28:28 2015 +0800

----------------------------------------------------------------------
 include/neuralnet/neuralnet.h |  10 +-
 include/trainer/server.h      |   9 +-
 include/trainer/trainer.h     |  12 +-
 include/trainer/worker.h      | 129 ++++++++++++---------
 src/neuralnet/neuralnet.cc    |   8 +-
 src/trainer/server.cc         |   3 +-
 src/trainer/trainer.cc        |  24 ++--
 src/trainer/worker.cc         | 229 ++++++++++++++++---------------------
 8 files changed, 208 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/neuralnet/neuralnet.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h
index 06fd977..693fe19 100644
--- a/include/neuralnet/neuralnet.h
+++ b/include/neuralnet/neuralnet.h
@@ -22,8 +22,6 @@
 #ifndef SINGA_NEURALNET_NEURALNET_H_
 #define SINGA_NEURALNET_NEURALNET_H_
 
-#include <map>
-#include <memory>
 #include <string>
 #include <vector>
 
@@ -52,10 +50,10 @@ class NeuralNet {
    * @param net_conf proto for the neural network
    * @param phase test/training/validation
    * @param npartitions num of partitions, do partitioning if num > 1
-   * @return shared pointer to a neural net
+   * @return pointer to a neural net
    */
-  static std::shared_ptr<NeuralNet> Create(const NetProto& net_conf,
-                                           Phase phase, int npartitions);
+  static NeuralNet* Create(const NetProto& net_conf, Phase phase,
+                           int npartitions);
 
   /**
    * construct the net structure from protocol buffer.
@@ -71,7 +69,7 @@ class NeuralNet {
   /**
    * Share memory of parameter values from other neuralnet
    */
-  void ShareParamsFrom(std::shared_ptr<NeuralNet> other);
+  void ShareParamsFrom(NeuralNet* other);
   inline const std::vector<Layer*>& layers() { return layers_; }
   inline const std::vector<Param*>& params() const { return params_; }
   inline Layer* name2layer(std::string name) const {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index 3f3539a..84b3a41 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -22,7 +22,6 @@
 #ifndef SINGA_TRAINER_SERVER_H_
 #define SINGA_TRAINER_SERVER_H_
 
-#include <memory>
 #include <unordered_map>
 #include <vector>
 #include "communication/socket.h"
@@ -45,7 +44,7 @@ namespace singa {
 class Server {
  public:
   Server(int group_id, int server_id);
-  virtual ~Server();
+  ~Server();
   void Setup(const UpdaterProto& proto, const std::vector<int>& slice2group,
              const std::vector<int>& slice2server);
   void Run();
@@ -59,7 +58,7 @@ class Server {
    * @return the orignal message or a response message which contains the values
    * of the Param with the request version.
    */
-  virtual Msg* HandleGet(Msg** msg);
+  Msg* HandleGet(Msg** msg);
   /**
    * Process Update request.
    *
@@ -88,7 +87,7 @@ class Server {
    * @return the original message or response message. If we don't want to
    * acknowledge the put request, then return nullptr.
    */
-  virtual Msg* HandlePut(Msg **msg);
+  Msg* HandlePut(Msg **msg);
   /**
    * Handle sync request from other server groups.
    *
@@ -100,7 +99,7 @@ class Server {
    * @param msg request msg containing the parameter updates
    * @return response msg that contains the fresh parameter values.
    */
-  virtual Msg* HandleSyncRequest(Msg** msg);
+  Msg* HandleSyncRequest(Msg** msg);
   /**
    * Handle sync response.
    *

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 0b03dea..d3d332f 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -21,8 +21,10 @@
 
 #ifndef INCLUDE_TRAINER_TRAINER_H_
 #define INCLUDE_TRAINER_TRAINER_H_
-#include <unordered_map>
+
 #include <queue>
+#include <vector>
+#include <unordered_map>
 #include "proto/job.pb.h"
 #include "proto/singa.pb.h"
 #include "utils/param.h"
@@ -34,13 +36,15 @@
 #include "communication/socket.h"
 
 namespace singa {
+
+using std::vector;
+  
 /**
  * Every running process has a training object which launches one or more
  * worker (and server) threads.
  *
  * The main thread runs a loop to forward messages between workers and servers.
  */
-
 class Trainer{
  public:
   ~Trainer();
@@ -82,7 +86,7 @@ class Trainer{
    * @param jobConf
    * @return worker instances
    */
-  vector<Worker*> CreateWorkers(int nthread, const JobProto& jobConf);
+  vector<Worker*> CreateWorkers(const JobProto& jobConf);
 
   /**
    * Setup workers and servers.
@@ -158,6 +162,8 @@ class Trainer{
   std::unordered_map<int, ParamEntry*> worker_shard_;
   //!< map from slice to the server that updates it
   vector<int> slice2server_;
+  //stub will destroy all neuralnets in the end
+  vector<NeuralNet*> nets_;
 };
 } /* singa */
 #endif // INCLUDE_TRAINER_TRAINER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 679435c..66439ec 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -21,19 +21,16 @@
 
 #ifndef SINGA_TRAINER_WORKER_H_
 #define SINGA_TRAINER_WORKER_H_
+
+#include <string>
+#include "communication/socket.h"
 #include "neuralnet/neuralnet.h"
 #include "proto/job.pb.h"
-#include "communication/socket.h"
 
 namespace singa {
 
-using std::map;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-  
 //!< sleep 5 milliseconds if the Param is not updated to the expected version
-const int kCollectSleepTime=5;
+const int kCollectSleepTime = 5;
 /**
  * The Worker class which runs the training algorithm.
  * The first worker group will initialize parameters of the Net,
@@ -53,19 +50,13 @@ class Worker {
    * @param grp_id global worker group ID
    * @param id worker ID within the group
    */
-  virtual void Init(int thread_id, int grp_id, int id);
+  virtual void Init(int grp_id, int id);
   virtual ~Worker();
   /**
    * Setup members
    */
-  void Setup(const JobProto& job, shared_ptr<NeuralNet> train_net,
-      shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net);
-  /**
-    * Main function of Worker.
-    *
-    * Train the neuralnet step by step, test/validation is done periodically.
-    */
-  void Run();
+  void Setup(const JobProto& job, NeuralNet* train_net, NeuralNet* valid_net,
+             NeuralNet* test_net);
   /**
    * Init all local params (i.e., params from layers resident in this worker).
    *
@@ -78,12 +69,17 @@ class Worker {
    * train for a couple of steps to warmup the params before put
    * them to servers (warmup of JobProto controls this).
    *
-   * If the owner param is availabel from checkpoint file, then its
+   * If the owner param is available from checkpoint file, then its
    * values are parsed from the checkpoint file instead of randomly initialized.
    * For params who do not have checkpoints, randomly init them.
    */
   void InitLocalParams();
-
+  /**
+    * Main function of Worker.
+    *
+    * Train the neuralnet step by step, test/validation is done periodically.
+    */
+  void Run();
   /**
    * Checkpoint all params owned by the worker from the first group onto disk.
    * The serialization is done using BlobProtos which includes the name, version
@@ -93,31 +89,30 @@ class Worker {
    * @param step training step of this worker
    * @param net the training net whose params will be dumped.
    */
-  void Checkpoint(int step, shared_ptr<NeuralNet> net);
+  void Checkpoint(int step, NeuralNet* net);
   /**
     * Test the perforance of the learned model on validation or test dataset.
     * Test is done by the first group.
     * @param net, neural network
     */
-  void Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net);
+  void Test(int nsteps, Phase phase, NeuralNet* net);
   /**
     * Train one mini-batch.
     * Test/Validation is done before training.
     */
-  virtual void TrainOneBatch(int step, Metric* perf)=0;
+  virtual void TrainOneBatch(int step, Metric* perf) = 0;
   /**
    * Test/validate one mini-batch.
    */
-  virtual void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
-      Metric* perf)=0;
+  virtual void TestOneBatch(int step, Phase phase, NeuralNet* net,
+                            Metric* perf) = 0;
   /**
    * Report performance to the stub.
    *
    * @param prefix display prefix, e.g., 'Train', 'Test'
    * @param perf
    */
-  void Report(const string& prefix, const Metric & perf);
-
+  void Report(const std::string& prefix, const Metric & perf);
   /**
    * Put Param to server.
    * @param param
@@ -148,80 +143,101 @@ class Worker {
   /**
    * Call Collect for every param of net
    */
-  int CollectAll(shared_ptr<NeuralNet> net, int step);
+  int CollectAll(NeuralNet* net, int step);
   /**
    * Receive blobs from other workers due to model partitions.
    */
-  void ReceiveBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net);
+  void ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
   /**
    * Send blobs to other workers due to model partitions.
    */
-  void SendBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net);
-
+  void SendBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
   /**
    * Check is it time to display training info, e.g., loss and precison.
    */
-  inline bool DisplayNow(int step) const;
+  inline bool DisplayNow(int step) const {
+    return job_conf_.disp_freq() > 0
+           && step >= job_conf_.disp_after()
+           && ((step - job_conf_.disp_after()) % job_conf_.disp_freq() == 0);
+  }
   /**
    * Check is it time to display training info, e.g., loss and precison.
    */
-  inline bool DisplayDebugInfo(int step) const;
+  inline bool DisplayDebugInfo(int step) const {
+    return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0;
+  }
   /**
    * Check is it time to stop
    */
-  inline bool StopNow(int step) const;
+  inline bool StopNow(int step) const {
+    return step >= job_conf_.train_steps();
+  }
   /**
    * Check is it time to do checkpoint.
    */
-  inline bool CheckpointNow(int step) const;
+  inline bool CheckpointNow(int step) const {
+    return grp_id_ == 0
+           && job_conf_.checkpoint_freq() > 0
+           && step >= job_conf_.checkpoint_after()
+           && ((step - job_conf_.checkpoint_after())
+              % job_conf_.checkpoint_freq() == 0);
+  }
   /**
    * Check is it time to do test.
    * @param step the ::Train() has been called this num times.
    */
-  inline bool TestNow(int step) const;
+  inline bool TestNow(int step) const {
+    return grp_id_ == 0
+           && job_conf_.test_freq() > 0
+           && job_conf_.test_steps() > 0
+           && step >= job_conf_.test_after()
+           && ((step - job_conf_.test_after()) % job_conf_.test_freq() == 0);
+  }
   /**
    * Check is it time to do validation.
    * @param step the ::Train() has been called step times.
    */
-  inline bool ValidateNow(int step) const;
-
+  inline bool ValidateNow(int step) const {
+    return grp_id_ == 0
+           && job_conf_.valid_freq() > 0
+           && job_conf_.valid_steps() > 0
+           && step >= job_conf_.valid_after()
+           && ((step - job_conf_.valid_after()) % job_conf_.valid_freq() == 0);
+  }
   /**
    * @return group ID
    */
-  int grp_id() const { return grp_id_;}
-
+  int grp_id() const { return grp_id_; }
   /**
    * @reutrn worker ID within the worker group.
    */
-  int id() const { return id_;}
+  int id() const { return id_; }
 
  protected:
-  int thread_id_, grp_id_, id_;
-  int step_;
+  int grp_id_ = -1, id_ = -1;
+  int step_ = 0;
   JobProto job_conf_;
-  shared_ptr<NeuralNet> train_net_, test_net_, validation_net_;
-  Dealer* layer_dealer_, *dealer_;
+  NeuralNet* train_net_ = nullptr;
+  NeuralNet* test_net_ = nullptr;
+  NeuralNet* validation_net_ = nullptr;
+  Dealer* layer_dealer_ = nullptr;
+  Dealer* dealer_ = nullptr;
 };
 
-class BPWorker: public Worker{
+class BPWorker: public Worker {
  public:
-  ~BPWorker(){}
-  void Init(int thread_id, int grp_id, int id) override;
   void TrainOneBatch(int step, Metric* perf) override;
-  void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
-      Metric* perf) override;
-
-  void Forward(int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf);
-  void Backward(int step, shared_ptr<NeuralNet> net);
+  void TestOneBatch(int step, Phase phase, NeuralNet* net, Metric* perf)
+      override;
+  void Forward(int step, Phase phase, NeuralNet* net, Metric* perf);
+  void Backward(int step, NeuralNet* net);
 };
 
-class CDWorker: public Worker{
+class CDWorker: public Worker {
  public:
   void TrainOneBatch(int step, Metric* perf) override;
-  void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
-      Metric* perf) override;
+  void TestOneBatch(int step, Phase phase, NeuralNet* net, Metric* perf)
+      override;
 };
 
 inline int BlobTrgt(int grp, int layer) {
@@ -236,6 +252,7 @@ inline int BlobLayer(int blob_trgt) {
   static int mask = (1 << 16) -1;
   return blob_trgt & mask;
 }
+
 }  // namespace singa
 
 #endif  // SINGA_TRAINER_WORKER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 286d273..200824a 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -28,11 +28,10 @@
 namespace singa {
 
 using std::map;
-using std::shared_ptr;
 using std::string;
 using std::vector;
 
-shared_ptr<NeuralNet> NeuralNet::Create(const NetProto& net_conf, Phase phase,
+NeuralNet* NeuralNet::Create(const NetProto& net_conf, Phase phase,
                                         int npartitions) {
   NetProto conf;
   conf.CopyFrom(net_conf);
@@ -76,8 +75,7 @@ shared_ptr<NeuralNet> NeuralNet::Create(const NetProto& net_conf, Phase phase,
   }
   LOG(INFO) << "NeuralNet config is\n" << conf.DebugString();
   // TODO(wangwei) create net based on net type, e.g., directed, undirected, etc
-  auto net = std::make_shared<NeuralNet>(conf, npartitions);
-  return net;
+  return new NeuralNet(conf, npartitions);
 }
 
 NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
@@ -107,7 +105,7 @@ std::string NeuralNet::ToAdjacency() {
   return disp;
 }
 
-void NeuralNet::ShareParamsFrom(shared_ptr<NeuralNet> other) {
+void NeuralNet::ShareParamsFrom(NeuralNet* other) {
   for (auto& layer : layers_) {
     auto otherlayer = other->name2layer(layer->name());
     if (otherlayer != nullptr) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 29f6a68..5e74c1b 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -19,11 +19,12 @@
 *
 *************************************************************/
 
+#include "trainer/server.h"
+
 #include <thread>
 #include <chrono>
 #include "mshadow/tensor.h"
 #include "proto/common.pb.h"
-#include "trainer/server.h"
 #include "utils/param.h"
 #include "utils/singleton.h"
 #include "utils/factory.h"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index c928d91..8a0589e 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -38,11 +38,13 @@ using std::vector;
 using std::map;
 using std::queue;
 using namespace std::chrono;
-using std::make_shared;
+using std::string;
 
 /***********************Trainer****************************/
 Trainer::~Trainer() {
   delete router_;
+  for (NeuralNet* p : nets_)
+    delete p;
 }
 
 const vector<int> SliceParams(const vector<Param*>& params) {
@@ -92,30 +94,35 @@ void Trainer::SetupWorkerServer(
   int grp_size = cluster->nworkers_per_group();
   const auto& net_conf = job_conf.neuralnet();
   auto net = NeuralNet::Create(net_conf, kTrain, grp_size);
+  nets_.push_back(net);
   // MUST do SliceParam before share param/net with others
   auto slices = SliceParams(net->params());
 
-  std::unordered_map<int, shared_ptr<NeuralNet>> grp_net;
+  std::unordered_map<int, NeuralNet*> grp_net;
   int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
   for (auto worker : workers) {
     int grp_id = worker->grp_id();
     int worker_id = worker->id();
-    shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr;
+    NeuralNet* test_net = nullptr;
+    NeuralNet* valid_net = nullptr;
     if (grp_net.find(grp_id) == grp_net.end()) {
       if (grp_id == first_grp) {
         //  test are performed only by the first group now. TODO update.
         if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
           test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp
           test_net->ShareParamsFrom(net);
+          nets_.push_back(test_net);
         }
         //  validation are performed only by the first group. TODO update.
         if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
           valid_net = NeuralNet::Create(net_conf, kValidation, 1);
           valid_net->ShareParamsFrom(net);
+          nets_.push_back(valid_net);
         }
         grp_net[grp_id] = net;
       } else {
         grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
+        nets_.push_back(grp_net[grp_id]);
         if(cluster->share_memory())
           grp_net[grp_id]->ShareParamsFrom(net);
       }
@@ -131,7 +138,7 @@ void Trainer::SetupWorkerServer(
       }
     }
     LOG(INFO) << "grp " << worker->grp_id() << ", worker "
-      << worker->id() << " net " << grp_net[grp_id].get();
+              << worker->id() << " net " << grp_net[grp_id];
     worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
   }
 
@@ -168,7 +175,7 @@ vector<Server*> Trainer::CreateServers(const JobProto& job) {
 }
 
 
-vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
+vector<Worker*> Trainer::CreateWorkers(const JobProto& job) {
   auto cluster=Cluster::Get();
   vector<Worker*> workers;
   if(!cluster->has_worker())
@@ -180,7 +187,7 @@ vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
   for (int gid = gstart; gid < gend; gid++) {
     for (int wid = wstart; wid < wend; wid++) {
       auto *worker = Worker::Create(job);
-      worker->Init(nthreads++,gid, wid);
+      worker->Init(gid, wid);
       workers.push_back(worker);
     }
   }
@@ -241,13 +248,12 @@ void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
   // register endpoint to zookeeper
   cluster->Register(getpid(), hostip + ":" + std::to_string(port));
 
-  int nthreads = 1;
-  const vector<Worker*> workers = CreateWorkers(nthreads, *job);
-  nthreads += workers.size();
+  const vector<Worker*> workers = CreateWorkers(*job);
   const vector<Server*> servers = CreateServers(*job);
   SetupWorkerServer(*job, workers, servers);
 
 #ifdef USE_MPI
+  int nthreads = workers.size() + servers.size();
   for (int i = 0; i < nthreads; i++)
     MPIQueues.push_back(make_shared<SafeQueue>());
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 23382e3..70859de 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -19,18 +19,19 @@
 *
 *************************************************************/
 
+#include "trainer/worker.h"
+
 #include <glog/logging.h>
-#include <thread>
 #include <chrono>
 #include <thread>
 #include <typeinfo>
-#include "utils/singleton.h"
 #include "utils/cluster.h"
 #include "utils/factory.h"
-#include "trainer/worker.h"
+#include "utils/singleton.h"
 
 namespace singa {
-using std::thread;
+
+using std::string;
 
 Worker* Worker::Create(const JobProto& proto) {
   auto factory = Singleton<Factory<singa::Worker>>::Instance();
@@ -43,22 +44,12 @@ Worker* Worker::Create(const JobProto& proto) {
   return worker;
 }
 
-void Worker::Init(int thread_id, int grp_id, int id) {
-  thread_id_ = thread_id;
+void Worker::Init(int grp_id, int id) {
   grp_id_ = grp_id;
   id_ = id;
   layer_dealer_ = dealer_ = nullptr;
 }
 
-void Worker::Setup(
-    const JobProto& job, shared_ptr<NeuralNet> train_net,
-    shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net) {
-  job_conf_.CopyFrom(job);
-  train_net_ = train_net;
-  validation_net_ = valid_net;
-  test_net_ = test_net;
-}
-
 Worker::~Worker() {
   if (layer_dealer_)
     delete layer_dealer_;
@@ -66,17 +57,25 @@ Worker::~Worker() {
     delete dealer_;
 }
 
+void Worker::Setup(const JobProto& job, NeuralNet* train_net,
+                   NeuralNet* valid_net, NeuralNet* test_net) {
+  job_conf_.CopyFrom(job);
+  train_net_ = train_net;
+  validation_net_ = valid_net;
+  test_net_ = test_net;
+}
+
 void Worker::InitLocalParams() {
   // for each server grp, its first subscriber worker grp does the param init
   if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) {
     // extract params that should be initialized by this worker
     // must gen a name for each param if the user doesn't config it
     std::unordered_map<string, Param*> name2param;
-    for (auto layer: train_net_->layers()){
+    for (auto layer : train_net_->layers()) {
       if (layer->partition_id() == id_) {
         for (auto param : layer->GetParams()) {
           // only owners fill the memory of parameter values.
-          if(param->owner() == param->id()) {
+          if (param->owner() == param->id()) {
             CHECK(name2param.find(param->name()) == name2param.end());
             name2param[param->name()] = param;
           }
@@ -94,7 +93,7 @@ void Worker::InitLocalParams() {
         if (name2param.find(bps.name(i)) != name2param.end()) {
           name2param.at(bps.name(i))->FromProto(bps.blob(i));
           //  if load from pre-training params, reset version to start step
-          if(job_conf_.reset_param_version())
+          if (job_conf_.reset_param_version())
             name2param.at(bps.name(i))->set_version(job_conf_.step());
           else  // if resume training, use the same version as last checkpoint
             name2param.at(bps.name(i))->set_version(bps.version(i));
@@ -130,30 +129,6 @@ void Worker::InitLocalParams() {
   }
 }
 
-void Worker::Checkpoint(int step, shared_ptr<NeuralNet> net) {
-  if (grp_id_ == 0) {
-    BlobProtos bps;
-    for (auto layer: net->layers()){
-      if (layer->partition_id() == id_) {
-        for (auto param : layer->GetParams()) {
-          // only owners fill the memory of parameter values.
-          if(param->owner() == param->id()) {
-            auto *blob = bps.add_blob();
-            param->ToProto(blob);
-            bps.add_version(param->version());
-            bps.add_name(param->name());
-          }
-        }
-      }
-    }
-    char buf[256];
-    snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin",
-         Cluster::Get()->checkpoint_folder().c_str(), step, id_);
-    LOG(INFO) << "checkpoint to " << buf;
-    WriteProtoToBinaryFile(bps, buf);
-  }
-}
-
 void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) {
   dealer->Connect(kInprocRouterEndpoint);
   Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub));
@@ -166,13 +141,15 @@ void Worker::Run() {
   auto cluster = Cluster::Get();
   int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
   CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
-  dealer_ = new Dealer(2*thread_id_);
+  // TODO(wangsh): provide a unique sock id from cluster
+  dealer_ = new Dealer(0);
   ConnectStub(grp_id_, id_, dealer_, kWorkerParam);
   for (auto layer : train_net_->layers()) {
     if (layer->partition_id() == id_) {
       if (typeid(layer) == typeid(BridgeDstLayer)
           || typeid(layer) == typeid(BridgeSrcLayer)) {
-        layer_dealer_ = new Dealer(2*thread_id_+1);
+        // TODO(wangsh): provide a unique socket id from cluster
+        layer_dealer_ = new Dealer(1);
         ConnectStub(grp_id_, id_, layer_dealer_, kWorkerLayer);
         break;
       }
@@ -184,16 +161,15 @@ void Worker::Run() {
   Metric perf;
   while (!StopNow(step_)) {
     if (ValidateNow(step_) && validation_net_ != nullptr) {
-      //LOG(ERROR)<<"Validation at step "<<step;
+      // LOG(ERROR)<<"Validation at step "<<step;
       CollectAll(validation_net_, step_);
       Test(job_conf_.valid_steps(), kValidation, validation_net_);
     }
     if (TestNow(step_) && test_net_ != nullptr) {
-      //LOG(ERROR)<<"Test at step "<<step;
+      // LOG(ERROR)<<"Test at step "<<step;
       CollectAll(test_net_, step_);
       Test(job_conf_.test_steps(), kTest, test_net_);
     }
-
     if (CheckpointNow(step_)) {
       CollectAll(train_net_, step_);
       Checkpoint(step_, train_net_);
@@ -210,21 +186,41 @@ void Worker::Run() {
 
   // save the model
   Checkpoint(step_, train_net_);
-
   // clean up
   cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
   // notify the stub on worker stop
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1,-1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_type(kStop);
   dealer_->Send(&msg);  // use param dealer to send the stop msg
-
   LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
 }
 
-
+void Worker::Checkpoint(int step, NeuralNet* net) {
+  if (grp_id_ == 0) {
+    BlobProtos bps;
+    for (auto layer : net->layers()) {
+      if (layer->partition_id() == id_) {
+        for (auto param : layer->GetParams()) {
+          // only owners fill the memory of parameter values.
+          if (param->owner() == param->id()) {
+            auto *blob = bps.add_blob();
+            param->ToProto(blob);
+            bps.add_version(param->version());
+            bps.add_name(param->name());
+          }
+        }
+      }
+    }
+    char buf[256];
+    snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin",
+             Cluster::Get()->checkpoint_folder().c_str(), step, id_);
+    LOG(INFO) << "checkpoint to " << buf;
+    WriteProtoToBinaryFile(bps, buf);
+  }
+}
 
 int Worker::Put(Param* param, int step) {
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(ParamTrgt(param->owner(), 0), step);
   msg->set_type(kPut);
   dealer_->Send(&msg);
@@ -234,7 +230,7 @@ int Worker::Put(Param* param, int step) {
 int Worker::Get(Param* param, int step) {
   if (param->version() >= step)
     return 1;
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(ParamTrgt(param->owner(), 0), step);
   msg->set_type(kGet);
   dealer_->Send(&msg);
@@ -243,28 +239,31 @@ int Worker::Get(Param* param, int step) {
 
 int Worker::Update(Param* param, int step) {
   param->set_local_version(param->version());
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(ParamTrgt(param->owner(), 0), step);
   msg->set_type(kUpdate);
   dealer_->Send(&msg);
   return 1;
 }
 
-int Worker::CollectAll(shared_ptr<NeuralNet> net, int step) {
+int Worker::CollectAll(NeuralNet* net, int step) {
   auto& layers = net->layers();
-  for (auto& layer : layers){
-    if (layer->partition_id() == id_)
-      for (Param* p: layer->GetParams()) {
+  for (auto& layer : layers) {
+    if (layer->partition_id() == id_) {
+      for (Param* p : layer->GetParams()) {
         Collect(p, step);
       }
+    }
   }
   return 1;
 }
+
 int Worker::Collect(Param* param, int step) {
   while (param->version() <= param->local_version())
     std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
   return 1;
 }
+
 void Worker::Report(const string& prefix, const Metric & perf) {
   Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(0, step_);
@@ -275,8 +274,8 @@ void Worker::Report(const string& prefix, const Metric & perf) {
   dealer_->Send(&msg);
 }
 
-void Worker::ReceiveBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net) {
+void Worker::ReceiveBlobs(bool data, bool grad, BridgeLayer* layer,
+                          NeuralNet* net) {
   while (!layer->ready()) {
     auto msg = layer_dealer_->Receive();
     CHECK_EQ(AddrGrp(msg->src()), grp_id_);
@@ -290,19 +289,19 @@ void Worker::ReceiveBlobs(
   }
 }
 
-void Worker::SendBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net) {
-  auto dst=layer->dstlayers().at(0);
-  Msg *msg=new Msg();
+void Worker::SendBlobs(bool data, bool grad, BridgeLayer* layer,
+                       NeuralNet* net) {
+  auto dst = layer->dstlayers().at(0);
+  Msg *msg = new Msg();
   msg->set_src(Addr(grp_id_, id_, kWorkerLayer));
   msg->set_dst(Addr(grp_id_, dst->partition_id(), kWorkerLayer));
   msg->AddFrame(dst->name().c_str(), dst->name().length());
-  auto const & blob=layer->data(nullptr);
-  msg->AddFrame(blob.cpu_data(), blob.count()*sizeof(float));
+  auto const & blob = layer->data(nullptr);
+  msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float));
   layer_dealer_->Send(&msg);
 }
 
-void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net) {
+void Worker::Test(int nsteps, Phase phase, NeuralNet* net) {
   Metric perf;
   for (int step = 0; step < nsteps; step++)
     TestOneBatch(step, phase, net, &perf);
@@ -312,96 +311,63 @@ void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net) {
     Report("Test", perf);
 }
 
-bool Worker::DisplayNow(int step) const {
-  return (job_conf_.disp_freq() > 0
-      && step >= job_conf_.disp_after()
-      && ((step - job_conf_.disp_after())
-        % job_conf_.disp_freq() == 0));
-}
-
-bool Worker::DisplayDebugInfo(int step) const {
-  return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0;
-}
-bool Worker::StopNow(int step) const {
-  return step >= job_conf_.train_steps();
-}
-bool Worker::CheckpointNow(int step) const {
-  return (grp_id_ == 0
-      && job_conf_.checkpoint_freq() > 0
-      && step >= job_conf_.checkpoint_after()
-      && ((step - job_conf_.checkpoint_after())
-        % job_conf_.checkpoint_freq() == 0));
-}
-bool Worker::TestNow(const int step) const {
-  return (grp_id_ == 0
-      && job_conf_.test_freq() > 0
-      && job_conf_.test_steps() > 0
-      && step >= job_conf_.test_after()
-      && ((step - job_conf_.test_after())
-        % job_conf_.test_freq() == 0));
-}
-bool Worker::ValidateNow(const int step) const {
-  return (grp_id_ == 0
-      && job_conf_.valid_freq() > 0
-      && job_conf_.valid_steps() > 0
-      && step >= job_conf_.valid_after()
-      && ((step - job_conf_.valid_after())
-        % job_conf_.valid_freq() == 0));
+/****************************BPWorker**********************************/
+void BPWorker::TrainOneBatch(int step, Metric* perf) {
+  Forward(step, kTrain, train_net_, perf);
+  Backward(step, train_net_);
 }
 
-
-/****************************BPWorker**********************************/
-void BPWorker::Init(int thread_id, int group_id, int worker_id) {
-  Worker::Init(thread_id, group_id, worker_id);
+void BPWorker::TestOneBatch(int step, Phase phase, NeuralNet* net,
+                            Metric* perf) {
+  Forward(step, phase, net, perf);
 }
 
-void BPWorker::Forward(
-    int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf) {
+void BPWorker::Forward(int step, Phase phase, NeuralNet* net, Metric* perf) {
   for (auto& layer : net->layers()) {
     if (layer->partition_id() == id_) {
-      if (typeid(*layer) == typeid(BridgeDstLayer))  // recv data from other workers
-        ReceiveBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
+      // TODO(wangwei): enable this for model partition
+      // recv data from other workers
+      // if (typeid(*layer) == typeid(BridgeDstLayer))
+      //   ReceiveBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
       if (phase == kTrain) {
-        for (Param* p : layer->GetParams()) {  // wait until param is updated
+        // wait until param is updated
+        for (Param* p : layer->GetParams()) {
           Collect(p, step);
         }
       }
       layer->ComputeFeature(phase | kForward, perf);
-      if (typeid(*layer) == typeid(BridgeSrcLayer))  // send data to other workers
-        SendBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
+      // TODO(wangwei): enable this for model partition
+      // send data to other workers
+      // if (typeid(*layer) == typeid(BridgeSrcLayer))
+      //   SendBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
       if (DisplayDebugInfo(step))
         LOG(INFO) << layer->DebugString(step, phase | kForward);
     }
   }
 }
 
-void BPWorker::Backward(int step, shared_ptr<NeuralNet> net) {
-  auto& layers=net->layers();
-  for (auto it = layers.rbegin(); it != layers.rend(); it++){
+void BPWorker::Backward(int step, NeuralNet* net) {
+  auto& layers = net->layers();
+  for (auto it = layers.rbegin(); it != layers.rend(); it++) {
     Layer* layer = *it;
     if (layer->partition_id() == id_) {
-      // if (typeid(layer) == typeid(BridgeSrcLayer))  // send data to other workers
-      // ReceiveBlobs(false, true, layer, net);
+      // TODO(wangwei): enable this for model partition
+      // send data to other workers
+      // if (typeid(layer) == typeid(BridgeSrcLayer))
+      //   ReceiveBlobs(false, true, layer, net);
       layer->ComputeGradient(kTrain | kBackward, nullptr);
       if (DisplayDebugInfo(step))
         LOG(INFO) << layer->DebugString(step, kTrain | kBackward);
       for (Param* p : layer->GetParams())
         Update(p, step);
-      if (typeid(layer) == typeid(BridgeDstLayer))  // recv data from other workers
-        SendBlobs(false, true, dynamic_cast<BridgeDstLayer*>(layer), net);
+      // TODO(wangwei): enable this for model partition
+      // recv data from other workers
+      // if (typeid(layer) == typeid(BridgeDstLayer))
+      //   SendBlobs(false, true, dynamic_cast<BridgeDstLayer*>(layer), net);
     }
   }
 }
 
-void BPWorker::TrainOneBatch(int step, Metric* perf) {
-  Forward(step, kTrain, train_net_, perf);
-  Backward(step, train_net_);
-}
-
-void BPWorker::TestOneBatch(int step, Phase phase,
-    shared_ptr<NeuralNet> net, Metric* perf) {
-  Forward(step, phase, net, perf);
-}
 /****************************CDWorker**********************************/
 void CDWorker::TrainOneBatch(int step, Metric* perf) {
   const auto& layers = train_net_->layers();
@@ -432,8 +398,8 @@ void CDWorker::TrainOneBatch(int step, Metric* perf) {
   }
 }
 
-void CDWorker::TestOneBatch(int step, Phase phase,
-    shared_ptr<NeuralNet> net, Metric* perf) {
+void CDWorker::TestOneBatch(int step, Phase phase, NeuralNet* net,
+                            Metric* perf) {
   auto& layers = net->layers();
   for (auto *layer : layers)
     layer->ComputeFeature(kPositive, perf);
@@ -441,4 +407,5 @@ void CDWorker::TestOneBatch(int step, Phase phase,
     if (typeid(*layer) == typeid(RBMVisLayer))
       layer->ComputeFeature(kNegative | kTest, perf);
 }
+
 }  // namespace singa


[2/5] incubator-singa git commit: SINGA-21 Code review 5

Posted by wa...@apache.org.
SINGA-21 Code review 5

review server.h, server.cc
 - format code
 - remove thread_id field
 - rename variables
   nUpdate_ -> n_update_
   nPendingSync_ -> n_pending_sync_
 - fix a bug in HandleUpdate that using a *msg in a unknown state

TODO:
 - give each socket an unique id from cluster
 - buffer the un-processed message, intead of sending it back to stub


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d3e1fca3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d3e1fca3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d3e1fca3

Branch: refs/heads/master
Commit: d3e1fca38b97e06ca113369d9a4f583750105a39
Parents: 3161175
Author: wang sheng <wa...@gmail.com>
Authored: Tue Sep 22 17:22:33 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Tue Sep 22 17:28:41 2015 +0800

----------------------------------------------------------------------
 include/trainer/server.h  |  74 ++++++++++++++---------------
 include/trainer/trainer.h |   2 +-
 src/trainer/server.cc     | 104 +++++++++++++++++++++--------------------
 src/trainer/trainer.cc    |   6 +--
 4 files changed, 92 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3e1fca3/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index 3f1c12d..3f3539a 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -19,17 +19,20 @@
 *
 *************************************************************/
 
-#ifndef INCLUDE_TRAINER_SERVER_H_
-#define INCLUDE_TRAINER_SERVER_H_
+#ifndef SINGA_TRAINER_SERVER_H_
+#define SINGA_TRAINER_SERVER_H_
+
 #include <memory>
 #include <unordered_map>
-#include <utils/param.h>
-#include <utils/updater.h>
-#include "proto/job.pb.h"
+#include <vector>
 #include "communication/socket.h"
+#include "proto/job.pb.h"
+#include "utils/param.h"
+#include "utils/updater.h"
 
 namespace singa {
-/* Repsond to worker's get/put/udpate request, and periodically syncing with
+
+ /* Repsond to worker's get/put/udpate request, and periodically syncing with
   * other servers.
   *
   * Normally, the Server creates a response message for each request which
@@ -39,33 +42,26 @@ namespace singa {
   * it just sends it to the router. The router will decide to re-send the
   * request to the server or send it to the worker.
   */
-class Server{
+class Server {
  public:
-  Server(int thread_id, int group_id, int server_id);
+  Server(int group_id, int server_id);
   virtual ~Server();
-  void Setup(const UpdaterProto& proto,
-      const std::vector<int>& slice2group,
-      const std::vector<int>& slice2server);
+  void Setup(const UpdaterProto& proto, const std::vector<int>& slice2group,
+             const std::vector<int>& slice2server);
   void Run();
-  const int grp_id() const {
-    return grp_id_;
-  }
-  const int id() const {
-    return id_;
-  }
+  inline int grp_id() const { return grp_id_; }
+  inline int id() const { return id_; }
 
  protected:
-
- 	/**
-	 * Process GET request.
+  /**
+   * Process GET request.
    *
    * @return the orignal message or a response message which contains the values
    * of the Param with the request version.
    */
-	virtual Msg* HandleGet(Msg** msg);
-
-	/**
-	 * Process Update request.
+  virtual Msg* HandleGet(Msg** msg);
+  /**
+   * Process Update request.
    *
    * It waits until received the gradients from all workers from the same worker
    * group. After updating, it responses to each sender with the new Param
@@ -86,16 +82,14 @@ class Server{
    * @return the orignal message or response message
    */
   const std::vector<Msg*> HandleUpdate(Msg **msg);
-
-	/**
-	 * Process PUT request.
+  /**
+   * Process PUT request.
    *
    * @return the original message or response message. If we don't want to
    * acknowledge the put request, then return nullptr.
-	 */
-	virtual Msg* HandlePut(Msg **msg);
-
-	/**
+   */
+  virtual Msg* HandlePut(Msg **msg);
+  /**
    * Handle sync request from other server groups.
    *
    * It adds updates of Param (slice) from other server groups directly to
@@ -106,8 +100,7 @@ class Server{
    * @param msg request msg containing the parameter updates
    * @return response msg that contains the fresh parameter values.
    */
-	virtual Msg* HandleSyncRequest(Msg** msg);
-
+  virtual Msg* HandleSyncRequest(Msg** msg);
   /**
    * Handle sync response.
    *
@@ -121,17 +114,20 @@ class Server{
   void HandleSyncResponse(Msg** msg);
 
  protected:
-  int thread_id_,grp_id_, id_;
-  Updater* updater_;
+  int grp_id_ = -1;
+  int id_ = -1;
+  Updater* updater_ = nullptr;
   //!< map from slice ID to slice and deleted in the destructor
   std::unordered_map<int, ParamEntry*> shard_;
   std::vector<int> slice2group_, slice2server_;
   //!< num of updates from last sync with master server group for a param/slice
-  std::vector<int> nUpdates_;
+  std::vector<int> n_updates_;
   //!< num of sync requests that have not been responded
-  std::vector<int> nPendingSync_;
+  std::vector<int> n_pending_sync_;
   std::vector<Blob<float>> last_sync_;
   std::unordered_map<int, std::vector<Msg*>> buffer_requests_;
 };
-} /* Server */
-#endif //INCLUDE_TRAINER_SERVER_H_
+
+}  // namespace singa
+
+#endif  // SINGA_TRAINER_SERVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3e1fca3/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 6630e51..0b03dea 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -73,7 +73,7 @@ class Trainer{
    * @param jobConf
    * @return server instances
    */
-  vector<Server*> CreateServers(int nthread, const JobProto& jobConf);
+  vector<Server*> CreateServers(const JobProto& jobConf);
   /**
    * Create workers instances.
    * @param nthread total num of threads in current procs which is used to

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3e1fca3/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 18fe7d2..29f6a68 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -22,30 +22,30 @@
 #include <thread>
 #include <chrono>
 #include "mshadow/tensor.h"
+#include "proto/common.pb.h"
 #include "trainer/server.h"
 #include "utils/param.h"
 #include "utils/singleton.h"
 #include "utils/factory.h"
 #include "utils/cluster.h"
-#include "proto/common.pb.h"
 
 namespace singa {
 
 using namespace mshadow;
 using std::vector;
 
-Server::Server(int thread_id,int group_id, int server_id):
-  thread_id_(thread_id),grp_id_(group_id), id_(server_id){
+Server::Server(int group_id, int server_id) {
+  grp_id_ = group_id;
+  id_ = server_id;
 }
 
-void Server::Setup(const UpdaterProto& proto,
-    const vector<int>& slice2group,
-    const vector<int>& slice2server) {
+void Server::Setup(const UpdaterProto& proto, const vector<int>& slice2group,
+                   const vector<int>& slice2server) {
   updater_ = Updater::Create(proto);
   slice2group_ = slice2group;
   slice2server_ = slice2server;
-  nUpdates_.resize(slice2group_.size(), 0);
-  nPendingSync_.resize(slice2group_.size(), 0);
+  n_updates_.resize(slice2group_.size(), 0);
+  n_pending_sync_.resize(slice2group_.size(), 0);
   last_sync_.resize(slice2group_.size());
 }
 
@@ -57,14 +57,14 @@ Server::~Server() {
       delete param;
 }
 
-void Stop(void * running) {
+void Stop(void* running) {
   *static_cast<bool *>(running) = false;
 }
 
 void Server::Run() {
   LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start";
-
-  auto dealer = new Dealer(2*thread_id_);
+  // TODO(wangsh): give each dealer a unique id
+  auto dealer = new Dealer(0);
   CHECK(dealer->Connect(kInprocRouterEndpoint));
   Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
   ping->set_type(kConnect);
@@ -77,7 +77,7 @@ void Server::Run() {
   // start recv loop and process requests
   while (running) {
     // must use poller here; otherwise Receive() gets stuck after workers stop.
-    auto *sock = poll.Wait(cluster->poll_time());
+    auto* sock = poll.Wait(cluster->poll_time());
     if (poll.Terminated()) {
       LOG(ERROR) << "Connection broken!";
       exit(0);
@@ -85,35 +85,35 @@ void Server::Run() {
       continue;
     }
     Msg* msg = dealer->Receive();
-    if (msg == nullptr) break; //  interrupted
+    if (msg == nullptr) break;  // interrupted
     Msg* response = nullptr;
     int type = msg->type();
     int slice_id = SliceID(msg->trgt_val());
     if (type == kPut) {
       response = HandlePut(&msg);
+    } else if (shard_.find(slice_id) == shard_.end()) {
+      // TODO(wangsh): buffer the msg instead, and process it after the
+      //               corresponding put request is done
+      // delay the processing by re-queue the msg. May sleep for a while?
+      response = msg;
     } else {
-      if (shard_.find(slice_id) == shard_.end()) {
-        // delay the processing by re-queue the msg. May sleep for a while?
-        response = msg;
-      }  else {
-        switch (type) {
-          case kGet:
-            response = HandleGet(&msg);
-            break;
-          case kUpdate:
-            for (auto reply : HandleUpdate(&msg))
-              dealer->Send(&reply);
-            break;
-          case kSyncRequest:
-            response = HandleSyncRequest(&msg);
-            break;
-          case kSyncResponse:
-            HandleSyncResponse(&msg);
-            break;
-          default:
-            LOG(ERROR)<<"Unknown message type "<<type;
-            break;
-        }
+      switch (type) {
+        case kGet:
+          response = HandleGet(&msg);
+          break;
+        case kUpdate:
+          for (auto reply : HandleUpdate(&msg))
+            dealer->Send(&reply);
+          break;
+        case kSyncRequest:
+          response = HandleSyncRequest(&msg);
+          break;
+        case kSyncResponse:
+          HandleSyncResponse(&msg);
+          break;
+        default:
+          LOG(ERROR) << "Unknown message type: " << type;
+          break;
       }
     }
     if (response != nullptr)
@@ -125,7 +125,6 @@ void Server::Run() {
   msg->set_type(kStop);
   dealer->Send(&msg);
   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-
   LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
   delete dealer;
 }
@@ -154,8 +153,8 @@ Msg* Server::HandlePut(Msg **msg) {
     last_sync_[slice_id].ReshapeLike(param->data());
     last_sync_[slice_id].CopyFrom(param->data());
   }
-  LOG(INFO)<<"server (group = " << grp_id_ << ", id = " << id_ <<") put slice="
-    << slice_id << " size=" << param->size();
+  LOG(INFO) << "server (group = " << grp_id_ << ", id = " << id_
+            <<") put slice=" << slice_id << " size=" << param->size();
   return response;
 }
 
@@ -163,9 +162,9 @@ Msg* Server::HandleGet(Msg **msg) {
   int val = (*msg)->trgt_val();
   auto param = shard_.at(SliceID(val))->shares.at(0);
   // re-queue the request if the param is not updated to the required version
-  if(param->version()<(*msg)->trgt_version())
+  if (param->version() < (*msg)->trgt_version()) {
     return *msg;
-  else {
+  } else {
     // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first();
     auto reply = param->HandleGetMsg(msg, false);
     reply->set_trgt(val, param->version());
@@ -183,12 +182,14 @@ const vector<Msg*> Server::HandleUpdate(Msg **msg) {
   (*msg)->ParseFormatFrame("i", &num_update);
   (*msg)->FirstFrame();
   entry->num_update += num_update;
-  // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src()) << ", " << num_update << " total " << entry->num_total;
+  // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src())
+  //            << ", " << num_update << " total " << entry->num_total;
   // do update until recv gradients from all shares of this param/slice
   if (entry->num_update >= entry->num_total) {
     CHECK_EQ(entry->num_update, entry->num_total);
     auto& request = buffer_requests_.at(sliceid);
     int step = (*msg)->trgt_version();
+    int trgt_val = (*msg)->trgt_val();
     auto param = entry->shares.at(0);
     // extract and aggregate gradients
     param->ParseUpdateMsgs(request);
@@ -196,16 +197,16 @@ const vector<Msg*> Server::HandleUpdate(Msg **msg) {
     param->set_local_version(param->local_version() + 1);
     // response to all shares of this param
     for (auto response : param->GenUpdateResponseMsgs(&request, false)) {
-      response->set_trgt((*msg)->trgt_val(), param->local_version());
+      response->set_trgt(trgt_val, param->local_version());
       ret.push_back(response);
     }
     entry->num_update = 0;
-    nUpdates_[sliceid]++;
+    n_updates_[sliceid]++;
     // sync with master group after at least sync_freq local updates
     // the last check is to avoid sending msg to stopped servers
     if (slice2group_[sliceid] != grp_id_
-        && nUpdates_[sliceid] >= Cluster::Get()->sync_freq()
-        && nPendingSync_[sliceid] <= Cluster::Get()->sync_freq()) {
+        && n_updates_[sliceid] >= Cluster::Get()->sync_freq()
+        && n_pending_sync_[sliceid] <= Cluster::Get()->sync_freq()) {
       auto shape = Shape1(param->size());
       Tensor<cpu, 1> tmp(last_sync_[sliceid].mutable_cpu_data(), shape);
       Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
@@ -213,14 +214,15 @@ const vector<Msg*> Server::HandleUpdate(Msg **msg) {
       int addr = Addr(slice2group_[sliceid], slice2server_[sliceid], kServer);
       Msg* sync = new Msg(Addr(grp_id_, id_, kServer), addr);
       sync->set_type(kSyncRequest);
-      sync->set_trgt((*msg)->trgt_val(), param->local_version());
+      sync->set_trgt(trgt_val, param->local_version());
       sync->AddFrame(tmp.dptr, param->size() * sizeof(float));
       Copy(tmp, cur);
       ret.push_back(sync);
-      nUpdates_[sliceid] = 0;
-      nPendingSync_[sliceid]++;
+      n_updates_[sliceid] = 0;
+      n_pending_sync_[sliceid]++;
     }
   }
+  // message already pushed to buffer, just need to reset the pointer
   *msg = nullptr;
   return ret;
 }
@@ -247,14 +249,14 @@ void Server::HandleSyncResponse(Msg **msg) {
   Msg* msgg = *msg;
   int slice = SliceID(msgg->trgt_val());
   auto param = shard_.at(slice)->shares.at(0);
-  auto shape=Shape1(param->size());
+  auto shape = Shape1(param->size());
   Tensor<cpu, 1> prev(last_sync_[param->id()].mutable_cpu_data(), shape);
   Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
   Tensor<cpu, 1> master(static_cast<float*>(msgg->FrameData()), shape);
   cur += master - prev;  // cur = master + (cur - prev);
   Copy(prev, cur);
   DeleteMsg(msg);
-  nPendingSync_[slice]--;
+  n_pending_sync_[slice]--;
 }
 
-} /* singa */
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3e1fca3/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 4a4c183..c928d91 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -144,7 +144,7 @@ void Trainer::SetupWorkerServer(
     server->Setup(job_conf.updater(), slice2group, slice2server_);
 }
 
-vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) {
+vector<Server*> Trainer::CreateServers(const JobProto& job) {
   auto cluster = Cluster::Get();
   vector<Server*> servers;
   if (!cluster->has_server())
@@ -160,7 +160,7 @@ vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) {
   int gstart = rng[0], gend = rng[1], start = rng[2], end = rng[3];
   for (int gid = gstart; gid < gend; gid++) {
     for (int sid = start; sid < end; sid++) {
-      auto server = new Server(nthreads++, gid, sid);
+      auto server = new Server(gid, sid);
       servers.push_back(server);
     }
   }
@@ -244,7 +244,7 @@ void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
   int nthreads = 1;
   const vector<Worker*> workers = CreateWorkers(nthreads, *job);
   nthreads += workers.size();
-  const vector<Server*> servers = CreateServers(nthreads, *job);
+  const vector<Server*> servers = CreateServers(*job);
   SetupWorkerServer(*job, workers, servers);
 
 #ifdef USE_MPI


[5/5] incubator-singa git commit: SINGA-21 Code review 5

Posted by wa...@apache.org.
SINGA-21 Code review 5

Move the implementation of template functions back to driver.h from driver.cc.
Otherwise there would be link errors when users register their own classes.
Because the registration functions are instantiated during compiling the
user code. diriver.h must contain the declarations and the
implementations of these template functions to instantiate them for users classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/0c6e5c69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/0c6e5c69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/0c6e5c69

Branch: refs/heads/master
Commit: 0c6e5c692bb07b481baca004b07b08b2a0601c6e
Parents: 366e6a8
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Wed Sep 23 23:02:01 2015 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Wed Sep 23 23:02:01 2015 +0800

----------------------------------------------------------------------
 include/driver.h | 59 +++++++++++++++++++++++++++++++++++++++++++++++++--
 src/driver.cc    | 48 ++---------------------------------------
 2 files changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/0c6e5c69/include/driver.h
----------------------------------------------------------------------
diff --git a/include/driver.h b/include/driver.h
index 563be77..b33c7cc 100644
--- a/include/driver.h
+++ b/include/driver.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,12 @@
 
 #include "proto/job.pb.h"
 #include "proto/singa.pb.h"
+#include "utils/factory.h"
+#include "utils/param.h"
+#include "utils/singleton.h"
+#include "utils/updater.h"
+#include "neuralnet/layer.h"
+#include "trainer/worker.h"
 
 namespace singa {
 
@@ -120,6 +126,55 @@ class Driver {
   SingaProto singa_conf_;
 };
 
+/************* Implementation of template functions*************************
+* Must put the implementation in driver.h file instead of driver.cc.
+* Otherwise there would be linking error caused by unknown registration
+* functions, becuase these function cannot be generated merely based on its
+* declearation in driver.h.
+*/
+
+template<typename Subclass, typename Type>
+int Driver::RegisterLayer(const Type& type) {
+  auto factory = Singleton<Factory<singa::Layer>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Layer));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterParam(const Type& type) {
+  auto factory = Singleton<Factory<singa::Param>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Param));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterParamGenerator(const Type& type) {
+  auto factory = Singleton<Factory<singa::ParamGenerator>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, ParamGenerator));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterUpdater(const Type& type) {
+  auto factory = Singleton<Factory<singa::Updater>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Updater));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterLRGenerator(const Type& type) {
+  auto factory = Singleton<Factory<singa::LRGenerator>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, LRGenerator));
+  return 1;
+}
+
+template<typename Subclass, typename Type>
+int Driver::RegisterWorker(const Type& type) {
+  auto factory = Singleton<Factory<singa::Worker>>::Instance();
+  factory->Register(type, CreateInstance(Subclass, Worker));
+  return 1;
+}
+
 }  // namespace singa
 
 #endif  // SINGA_DRIVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/0c6e5c69/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 42a1330..41b2342 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,12 +24,9 @@
 #include <cblas.h>
 #include <glog/logging.h>
 #include <string>
-#include "neuralnet/neuralnet.h"
 #include "neuralnet/layer.h"
 #include "trainer/trainer.h"
 #include "utils/common.h"
-#include "utils/factory.h"
-#include "utils/singleton.h"
 #include "utils/tinydir.h"
 
 namespace singa {
@@ -110,47 +107,6 @@ void Driver::Init(int argc, char **argv) {
   RegisterParamGenerator<UniformSqrtFanInOutGen>(kUniformSqrtFanInOut);
 }
 
-template<typename Subclass, typename Type>
-int Driver::RegisterLayer(const Type& type) {
-  auto factory = Singleton<Factory<singa::Layer>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Layer));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterParam(const Type& type) {
-  auto factory = Singleton<Factory<singa::Param>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Param));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterParamGenerator(const Type& type) {
-  auto factory = Singleton<Factory<singa::ParamGenerator>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, ParamGenerator));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterUpdater(const Type& type) {
-  auto factory = Singleton<Factory<singa::Updater>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Updater));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterLRGenerator(const Type& type) {
-  auto factory = Singleton<Factory<singa::LRGenerator>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, LRGenerator));
-  return 1;
-}
-
-template<typename Subclass, typename Type>
-int Driver::RegisterWorker(const Type& type) {
-  auto factory = Singleton<Factory<singa::Worker>>::Instance();
-  factory->Register(type, CreateInstance(Subclass, Worker));
-  return 1;
-}
 
 void Driver::Submit(bool resume, const JobProto& jobConf) {
   if (singa_conf_.has_log_dir())