You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/09/29 05:23:56 UTC

[5/5] incubator-singa git commit: SINGA-41:Support single node single GPU training

SINGA-41:Support single node single GPU training

Rebase to version 0.1.0.
Tested with cifar example.
-G compliation option degrades the performance.
Speed is similar to CConvolution+CPooling on CPU.
Need to optimized the speed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/9dbdfd68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/9dbdfd68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/9dbdfd68

Branch: refs/heads/gpu
Commit: 9dbdfd68695e8d3a30cafc14d941a36ea0bf55d6
Parents: 1770377
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Tue Sep 29 11:18:33 2015 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Tue Sep 29 11:23:21 2015 +0800

----------------------------------------------------------------------
 examples/cifar10/job.conf         |  12 +-
 include/utils/blob.h              |  14 +-
 src/neuralnet/connection_layer.cc |   6 +-
 src/neuralnet/neuron_layer.cu     | 203 ++++++-------
 src/trainer/server.cc             | 256 ----------------
 src/trainer/trainer.cc            | 521 ---------------------------------
 src/utils/blob.cc                 |   5 +-
 7 files changed, 120 insertions(+), 897 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index 343d969..91688a4 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -57,7 +57,7 @@ neuralnet {
 
   layer {
     name: "conv1"
-    type: kCConvolution
+    type: kConvolution
     srclayers: "rgb"
     convolution_conf {
       num_filters: 32
@@ -84,7 +84,7 @@ neuralnet {
 
   layer {
     name: "pool1"
-    type: kCPooling
+    type: kPooling
     srclayers: "conv1"
     pooling_conf {
       pool: MAX
@@ -109,7 +109,7 @@ neuralnet {
   }
   layer {
     name: "conv2"
-    type: kCConvolution
+    type: kConvolution
     srclayers: "norm1"
     convolution_conf {
       num_filters: 32
@@ -140,7 +140,7 @@ neuralnet {
   }
   layer {
     name: "pool2"
-    type: kCPooling
+    type: kPooling
     srclayers: "relu2"
     pooling_conf {
       pool: AVG
@@ -160,7 +160,7 @@ neuralnet {
   }
   layer {
     name: "conv3"
-    type: kCConvolution
+    type: kConvolution
     srclayers: "norm2"
     convolution_conf {
       num_filters: 64
@@ -190,7 +190,7 @@ neuralnet {
   }
   layer {
     name: "pool3"
-    type: kCPooling
+    type: kPooling
     srclayers: "relu3"
     pooling_conf {
       pool: AVG

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/include/utils/blob.h
----------------------------------------------------------------------
diff --git a/include/utils/blob.h b/include/utils/blob.h
index 903845d..754abbe 100644
--- a/include/utils/blob.h
+++ b/include/utils/blob.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -184,11 +184,11 @@ class Blob {
   }
   inline Dtype* mutable_xpu_data() {
     CHECK(data_);
-	#ifndef CPU_ONLY
-		return static_cast<Dtype*>(data_->mutable_gpu_data());
-	#else
-	    return static_cast<Dtype*>(data_->mutable_cpu_data());
-	#endif
+  #ifndef CPU_ONLY
+    return static_cast<Dtype*>(data_->mutable_gpu_data());
+  #else
+    return static_cast<Dtype*>(data_->mutable_cpu_data());
+  #endif
   }
   /// @brief Compute the sum of absolute values (L1 norm) of the data.
   Dtype asum_data() const;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/neuralnet/connection_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/connection_layer.cc b/src/neuralnet/connection_layer.cc
index 750a511..acf243d 100644
--- a/src/neuralnet/connection_layer.cc
+++ b/src/neuralnet/connection_layer.cc
@@ -27,9 +27,9 @@ using std::vector;
 /********* Implementation for BridgeDstLayer **************/
 void BridgeDstLayer::Setup(const LayerProto& proto,
     const vector<Layer*>& srclayers) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  data_.Reshape(srclayers_[0]->data(this).shape());
+  Layer::Setup(proto, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  data_.Reshape(srclayers[0]->data(this).shape());
   grad_.ReshapeLike(data_);
 }
 /************* Implementation for ConcateLayer ***********/

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/neuralnet/neuron_layer.cu
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuron_layer.cu b/src/neuralnet/neuron_layer.cu
index affb02f..a93ceb9 100644
--- a/src/neuralnet/neuron_layer.cu
+++ b/src/neuralnet/neuron_layer.cu
@@ -23,118 +23,120 @@
 
 #include <glog/logging.h>
 #include <algorithm>
+#include <string>
+#include <vector>
 #include "utils/singleton.h"
 #include "mshadow/tensor.h"
 #include "mshadow/cxxnet_op.h"
 
 namespace singa {
 
-  using namespace mshadow;
-  using namespace mshadow::expr;
-  using mshadow::cpu;
-  using mshadow::xpu;
-
-  using mshadow::Shape;
-  using mshadow::Shape1;
-  using mshadow::Shape2;
-  using mshadow::Shape3;
-  using mshadow::Shape4;
-  using mshadow::Tensor;
-
-  using std::string;
-  using std::vector;
-
-  inline Tensor<cpu, 4> Tensor4CPU(Blob<float>* blob) {
-    const vector<int>& shape = blob->shape();
-    Tensor<cpu, 4> tensor(blob->mutable_cpu_data(),
-        Shape4(shape[0], shape[1], shape[2], shape[3]));
-    return tensor;
-  }
+using namespace mshadow;
+using namespace mshadow::expr;
+using mshadow::cpu;
+using mshadow::xpu;
 
-  inline Tensor<cpu, 3> Tensor3CPU(Blob<float>* blob) {
-    const vector<int>& shape = blob->shape();
-    Tensor<cpu, 3> tensor(blob->mutable_cpu_data(),
-        Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
-    return tensor;
-  }
+using mshadow::Shape;
+using mshadow::Shape1;
+using mshadow::Shape2;
+using mshadow::Shape3;
+using mshadow::Shape4;
+using mshadow::Tensor;
 
-  inline Tensor<cpu, 2> Tensor2CPU(Blob<float>* blob) {
-    const vector<int>& shape = blob->shape();
-    Tensor<cpu, 2> tensor(blob->mutable_cpu_data(),
-        Shape2(shape[0], blob->count() / shape[0]));
-    return tensor;
-  }
+using std::string;
+using std::vector;
 
-  inline Tensor<cpu, 1> Tensor1CPU(Blob<float>* blob) {
-    Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count()));
-    return tensor;
-  }
+inline Tensor<cpu, 4> Tensor4CPU(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<cpu, 4> tensor(blob->mutable_cpu_data(),
+      Shape4(shape[0], shape[1], shape[2], shape[3]));
+  return tensor;
+}
 
-  inline Tensor<xpu, 4> Tensor4(Blob<float>* blob) {
-    const vector<int>& shape = blob->shape();
-    Tensor<xpu, 4> tensor(blob->mutable_xpu_data(),
-        Shape4(shape[0], shape[1], shape[2], shape[3]));
-    return tensor;
-  }
+inline Tensor<cpu, 3> Tensor3CPU(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<cpu, 3> tensor(blob->mutable_cpu_data(),
+      Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
+  return tensor;
+}
 
-  inline Tensor<xpu, 3> Tensor3(Blob<float>* blob){
-    const vector<int>& shape = blob->shape();
-    Tensor<xpu, 3> tensor(blob->mutable_xpu_data(),
-        Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
-    return tensor;
-  }
-  inline Tensor<xpu, 2> Tensor2(Blob<float>* blob){
-    const vector<int>& shape = blob->shape();
-    Tensor<xpu, 2> tensor(blob->mutable_xpu_data(),
-        Shape2(shape[0], blob->count() / shape[0]));
-    return tensor;
-  }
-  inline Tensor<xpu, 1> Tensor1(Blob<float>* blob){
-    Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count()));
-    return tensor;
-  }
+inline Tensor<cpu, 2> Tensor2CPU(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<cpu, 2> tensor(blob->mutable_cpu_data(),
+      Shape2(shape[0], blob->count() / shape[0]));
+  return tensor;
+}
 
-  /************ Implementation for ConvolutionLayer*************************/
-  ConvolutionLayer::~ConvolutionLayer() {
-    delete weight_;
-    delete bias_;
-  }
-  void ConvolutionLayer::Setup(const LayerProto& conf,
-      const vector<Layer*>& srclayers) {
-    CHECK_EQ(srclayers.size(), 1);
-    Layer::Setup(conf, srclayers);
-    ConvolutionProto conv_conf = conf.convolution_conf();
-    kernel_ = conv_conf.kernel();
-    CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
-    pad_ = conv_conf.pad();
-    stride_ = conv_conf.stride();
-    num_filters_ = conv_conf.num_filters();
-    if (partition_dim() > 0)
-      num_filters_ /= srclayers.at(0)->num_partitions();
-    const vector<int>& srcshape = srclayers[0]->data(this).shape();
-    int dim = srcshape.size();
-    CHECK_GT(dim, 2);
-    width_ = srcshape[dim - 1];
-    height_ = srcshape[dim - 2];
-    if (dim > 3)
-      channels_ = srcshape[dim - 3];
-    else if (dim > 2)
-      channels_ = 1;
-    batchsize_ = srcshape[0];
-    conv_height_ = (height_ + 2 * pad_ - kernel_) / stride_ + 1;
-    conv_width_ = (width_ + 2 * pad_ - kernel_) / stride_ + 1;
-    col_height_ = channels_ * kernel_ * kernel_;
-    col_width_ = conv_height_ * conv_width_;
-    vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_};
-    data_.Reshape(shape);
-    grad_.Reshape(shape);
-    col_data_.Reshape(vector<int>{col_height_, col_width_});
-    col_grad_.Reshape(vector<int>{col_height_, col_width_});
-    weight_ = Param::Create(conf.param(0));
-    bias_ = Param::Create(conf.param(1));
-    weight_->Setup(vector<int>{num_filters_, col_height_});
-    bias_->Setup(vector<int>{num_filters_});
-  }
+inline Tensor<cpu, 1> Tensor1CPU(Blob<float>* blob) {
+  Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count()));
+  return tensor;
+}
+
+inline Tensor<xpu, 4> Tensor4(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<xpu, 4> tensor(blob->mutable_xpu_data(),
+      Shape4(shape[0], shape[1], shape[2], shape[3]));
+  return tensor;
+}
+
+inline Tensor<xpu, 3> Tensor3(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<xpu, 3> tensor(blob->mutable_xpu_data(),
+      Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
+  return tensor;
+}
+inline Tensor<xpu, 2> Tensor2(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<xpu, 2> tensor(blob->mutable_xpu_data(),
+      Shape2(shape[0], blob->count() / shape[0]));
+  return tensor;
+}
+inline Tensor<xpu, 1> Tensor1(Blob<float>* blob) {
+  Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count()));
+  return tensor;
+}
+
+/************ Implementation for ConvolutionLayer*************************/
+ConvolutionLayer::~ConvolutionLayer() {
+  delete weight_;
+  delete bias_;
+}
+void ConvolutionLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  CHECK_EQ(srclayers.size(), 1);
+  Layer::Setup(conf, srclayers);
+  ConvolutionProto conv_conf = conf.convolution_conf();
+  kernel_ = conv_conf.kernel();
+  CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
+  pad_ = conv_conf.pad();
+  stride_ = conv_conf.stride();
+  num_filters_ = conv_conf.num_filters();
+  if (partition_dim() > 0)
+    num_filters_ /= srclayers.at(0)->num_partitions();
+  const vector<int>& srcshape = srclayers[0]->data(this).shape();
+  int dim = srcshape.size();
+  CHECK_GT(dim, 2);
+  width_ = srcshape[dim - 1];
+  height_ = srcshape[dim - 2];
+  if (dim > 3)
+    channels_ = srcshape[dim - 3];
+  else if (dim > 2)
+    channels_ = 1;
+  batchsize_ = srcshape[0];
+  conv_height_ = (height_ + 2 * pad_ - kernel_) / stride_ + 1;
+  conv_width_ = (width_ + 2 * pad_ - kernel_) / stride_ + 1;
+  col_height_ = channels_ * kernel_ * kernel_;
+  col_width_ = conv_height_ * conv_width_;
+  vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_};
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+  col_data_.Reshape(vector<int>{col_height_, col_width_});
+  col_grad_.Reshape(vector<int>{col_height_, col_width_});
+  weight_ = Param::Create(conf.param(0));
+  bias_ = Param::Create(conf.param(1));
+  weight_->Setup(vector<int>{num_filters_, col_height_});
+  bias_->Setup(vector<int>{num_filters_});
+}
 
 void ConvolutionLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
@@ -184,7 +186,6 @@ void ConvolutionLayer::ComputeGradient(int flag,
           imgshp);
     }
   }
- // weight_->mutable_data()->mutable_cpu_data();
 }
 
 /******************* Implementation for CConvolutionLayer *********/

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
deleted file mode 100644
index f5a0560..0000000
--- a/src/trainer/server.cc
+++ /dev/null
@@ -1,256 +0,0 @@
-#include <thread>
-#include <chrono>
-#include "mshadow/tensor.h"
-#include "trainer/server.h"
-#include "utils/param.h"
-#include "utils/singleton.h"
-#include "utils/factory.h"
-#include "utils/cluster.h"
-#include "proto/common.pb.h"
-
-namespace singa {
-
-using namespace mshadow;
-using std::vector;
-
-Server::Server(int thread_id,int group_id, int server_id):
-  thread_id_(thread_id),grp_id_(group_id), id_(server_id){
-}
-
-void Server::Setup(const UpdaterProto& proto,
-    std::unordered_map<int, ParamEntry*>* shard,
-    const vector<int>& slice2group) {
-  updater_ = Updater::Create(proto);
-  shard_ = shard;
-  slice2group_ = slice2group;
-}
-
-Server::~Server() {
-  delete updater_;
-}
-
-void Stop(void * running) {
-  *static_cast<bool *>(running) = false;
-}
-
-void Server::Run() {
-  LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start";
-  auto dealer = new Dealer(2*thread_id_);
-  CHECK(dealer->Connect(kInprocRouterEndpoint));
-  Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
-  ping->set_type(kConnect);
-  dealer->Send(&ping);
-
-  auto cluster = Cluster::Get();
-  bool running = true;
-  CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
-
-  int nserver_grps = cluster->nserver_groups();
-  vector<Param*> master_params;
-  size_t syncEntry=0;
-  Poller poll(dealer);
-  // start recv loop and process requests
-  while (running) {
-    auto *sock = poll.Wait(cluster->poll_time());
-    if (poll.Terminated()) {
-      LOG(ERROR) << "Connection broken!";
-      exit(0);
-    } else if (sock == nullptr) {
-      continue;
-    }
-    Msg* msg=dealer->Receive();
-    if (msg==nullptr) break;
-    Msg* response=nullptr;
-    int type=msg->type();
-    int slice_id = SliceID(msg->trgt_val());
-    if (type == kPut) {
-      response = HandlePut(&msg);
-      if(slice2group_[slice_id] == grp_id_)
-        master_params.push_back(shard_->at(slice_id)->shares.at(0));
-    } else {
-      if (shard_->find(slice_id) == shard_->end()) {
-        // delay the processing by re-queue the msg.
-        response = msg;
-      } else if (type == kSyncReminder) {
-        DeleteMsg(&msg);
-        if(syncEntry >= master_params.size())
-          continue;
-        auto param = master_params.at(syncEntry);
-        // control the frequency of synchronization
-        // currently sync is triggerred only when the slice is updated
-        // by local worker or other workers for at least nserver_groups times.
-        // TODO may optimize the trigger condition.
-        if (abs(param->local_version() - param->version()) >= nserver_grps) {
-          for (auto msg : GenSyncMsgs(param))
-            dealer->Send(&msg);
-          syncEntry = (syncEntry+1) % master_params.size();
-        }
-      } else {
-        switch (type) {
-          case kGet:
-            response = HandleGet(&msg);
-            break;
-          case kUpdate:
-            for (auto reply : HandleUpdate(&msg))
-              dealer->Send(&reply);
-            break;
-          case kSyncRequest:
-            response = HandleSyncRequest(&msg);
-            break;
-          default:
-            LOG(ERROR)<<"Unknown message type "<<type;
-            break;
-        }
-      }
-    }
-    if (response != nullptr)
-      dealer->Send(&response);
-  }
-
-  // send stop msg to stub
-  Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
-  msg->set_type(kStop);
-  dealer->Send(&msg);
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-
-  LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
-  delete dealer;
-}
-
-const vector<Msg*> Server::GenSyncMsgs(Param* param) {
-  vector<Msg*> ret;
-  // TODO replace the argument (0,0) to sync a chunk instead of a slice
-  auto msg = param->GenSyncMsg(0, 0);
-  auto cluster = Cluster::Get();
-  for (int i = 0; i < cluster->nserver_groups(); i++) {
-    if (i != grp_id_) {
-      Msg* tmp = msg;
-      if (i < cluster->nserver_groups() - 1)
-        tmp = new Msg(*msg);
-      // assume only one server per group, TODO generalize it
-      tmp->set_dst(Addr(i, 0, kServer));
-      tmp->set_src(Addr(grp_id_, id_, kServer));
-      ret.push_back(tmp);
-      param->set_version(param->local_version());
-      //LOG(ERROR)<<"sync slice="<<param->id()<<" to procs "<<i;
-    }
-  }
-  return ret;
-}
-
-Msg* Server::HandlePut(Msg **msg) {
-  int version = (*msg)->trgt_version();
-  int slice_id = SliceID((*msg)->trgt_val());
-  if (shard_->find(slice_id) != shard_->end())
-    LOG(FATAL) << "Param (" << slice_id << ") is put more than once";
-
-  // TODO(wangwei) replace hard coded param type 0
-  auto  param = Singleton<Factory<Param>>::Instance()->Create(0);
-  auto response = param->HandlePutMsg(msg, true);
-  // parse num of shares of this param from a worker group
-  int num_shares = 1;
-  if ((*msg)->NextFrame())
-    (*msg)->ParseFormatFrame("i", &num_shares);
-  DeleteMsg(msg);
-  (*shard_)[slice_id] = new ParamEntry(num_shares, param);
-  // must set version after HandlePutMsg which allocates the memory
-  param->set_version(version);
-  param->set_local_version(version);
-  param->set_id(slice_id);
-  //LOG(ERROR)<<"put norm "<<param->data().asum_data()<<", "<<pid;
-  // allocate blob for param sync between groups.
-  if (Cluster::Get()->nserver_groups() > 1 && slice2group_[slice_id] != grp_id_) {
-    last_data_[slice_id] = std::make_shared<Blob<float>>();
-    last_data_[slice_id]->ReshapeLike(param->data());
-    last_data_[slice_id]->CopyFrom(param->data());
-  }
-  LOG(INFO)<<"server (group = " << grp_id_ << ", id = " << id_ <<") put slice="
-    << slice_id << " size=" << param->size();
-  return response;
-}
-
-Msg* Server::HandleGet(Msg **msg) {
-  int val = (*msg)->trgt_val();
-  auto param = shard_->at(SliceID(val))->shares.at(0);
-  // re-queue the request if the param is not updated to the required version
-  if(param->version()<(*msg)->trgt_version())
-    return *msg;
-  else {
-    // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first();
-    auto reply = param->HandleGetMsg(msg, false);
-    reply->set_trgt(val, param->version());
-    return reply;
-  }
-}
-
-const vector<Msg*> Server::HandleUpdate(Msg **msg) {
-  vector<Msg*> ret;
-  int sliceid = SliceID((*msg)->trgt_val());
-  auto entry = shard_->at(sliceid);
-  buffer_requests_[sliceid].push_back(*msg);
-  int num_update;
-  (*msg)->LastFrame();
-  (*msg)->ParseFormatFrame("i", &num_update);
-  (*msg)->FirstFrame();
-  entry->num_update += num_update;
-  // LOG(ERROR) << "update "<<sliceid<< " from "<<(*msg)->src_second()
-  //  << ", " << num_update << " total " << entry->num_total;
-  // do update until recv gradients from all shares of this param/slice
-  if (entry->num_update >= entry->num_total) {
-    CHECK_EQ(entry->num_update, entry->num_total);
-    auto& request = buffer_requests_.at(sliceid);
-    int step = (*msg)->trgt_version();
-    auto param = entry->shares.at(0);
-    // extract and aggregate gradients
-    param->ParseUpdateMsgs(request);
-    updater_->Update(step, param, 1.0f / entry->num_total);
-    param->set_local_version(param->local_version() + 1);
-    // response to all shares of this param
-    for (auto response : param->GenUpdateResponseMsgs(&request, false)) {
-      response->set_trgt((*msg)->trgt_val(), param->local_version());
-      ret.push_back(response);
-    }
-    entry->num_update = 0;
-  }
-  *msg = nullptr;
-  return ret;
-}
-
-Msg* Server::HandleSyncRequest(Msg **msg) {
-  Msg* msgg = *msg;
-  int slice = SliceID(msgg->trgt_val());
-  auto param = shard_->at(slice)->shares.at(0);
-  Msg* response=nullptr;
-  auto shape=Shape1(param->size());
-  CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float));
-  Tensor<cpu, 1> tmp(static_cast<float*>(msgg->FrameData()), shape);
-  Tensor<cpu, 1> cur(param->mutable_data()->mutable_cpu_data(), shape);
-  //LOG(ERROR)<<"Recv sync for "<<param->id();
-  if (slice2group_[slice] == grp_id_) {
-    // recv sync msg on slice I am mastering
-    cur+=tmp;
-    param->set_local_version(param->local_version()+1);
-  } else {  // recv sync msg on slice mastered by others
-    TensorContainer<cpu, 1> diff(shape);
-    Tensor<cpu, 1> prev(last_data_[param->id()]->mutable_cpu_data(), shape);
-    diff=cur-prev;
-    msgg->NextFrame();
-    int bandwidth;
-    msgg->ParseFormatFrame("i", &bandwidth);
-    if (bandwidth > 0) {
-      // send back my updates to the server group mastering this param
-      response=new Msg(msgg->dst(), msgg->src());
-      response->set_type(kSyncRequest);
-      response->set_trgt(param->id(), param->version());
-      response->AddFrame(diff.dptr, param->size()*sizeof(float));
-      prev=diff+tmp;
-      Copy(cur, prev);
-    } else {  // no bandwidth, aggregate my updates for next sync
-      Copy(prev, tmp);
-      cur=tmp+diff;
-    }
-  }
-  DeleteMsg(msg);
-  return response;
-}
-} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
deleted file mode 100644
index c62e0d1..0000000
--- a/src/trainer/trainer.cc
+++ /dev/null
@@ -1,521 +0,0 @@
-#include <thread>
-#include <vector>
-#include <map>
-#include <chrono>
-#include <glog/logging.h>
-#include "utils/tinydir.h"
-#include <unistd.h>
-#include "utils/cluster.h"
-#include "utils/common.h"
-#include "proto/common.pb.h"
-#include "trainer/trainer.h"
-
-
-namespace singa {
-using std::vector;
-using std::map;
-using std::queue;
-using namespace std::chrono;
-using std::make_shared;
-
-/***********************Trainer****************************/
-Trainer::~Trainer() {
-  // free Params (i.e., slices) in server shard
-  for (auto entry : server_shard_)
-    for (auto param : entry.second->shares)
-      delete param;
-  delete router_;
-}
-
-const vector<int> SliceParams(const vector<Param*>& params) {
-  // for load-balance among servers in a group and among server groups
-  int nserver_grps = Cluster::Get()->nserver_groups();
-  int nservers_per_grp = Cluster::Get()->nservers_per_group();
-  int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp);
-
-  // collect sizes of unique Params
-  std::vector<int> paramsize;
-  for (auto param : params)
-    if (param->id() == param->owner())
-      paramsize.push_back(param->size());
-  // slice into lcm pieces to achieve good load-balance for both intra-group
-  // partition (among servers in a group) and inter-group partition (each group
-  // is assgined a sub-set of slices)
-  auto param_slice = Slice(lcm, paramsize);
-  // construct map from Param ID to its slices <slice id, len>
-  std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
-  vector<int> slices;
-  auto it = param_slice.begin();
-  int slice_id = 0;
-  for (auto param : params) {
-    if (param->id() == param->owner()) {
-      for (int len : *it) {
-        slices.push_back(len);
-        paramid2slices[param->id()].push_back(std::make_pair(slice_id++, len));
-      }
-      it++;
-    }
-  }
-  // add slice info for every Param
-  for (auto param : params)
-    for (auto entry : paramid2slices[param->owner()]) {
-      param->AddSlice(entry.first, entry.second);
-      LOG(INFO) << "param id " << param->id() << " owner=" << param->owner()
-        << ": " << entry.first << ", " << entry.second;
-    }
-  return slices;
-}
-
-void Trainer::SetupWorkerServer(
-    const JobProto& job_conf,
-    const vector<Worker*>& workers,
-    const vector<Server*>& servers) {
-  auto cluster = Cluster::Get();
-  int grp_size = cluster->nworkers_per_group();
-  const auto& net_conf = job_conf.neuralnet();
-  auto net = NeuralNet::Create(net_conf, kTrain, grp_size);
-  // MUST do SliceParam before share param/net with others
-  auto slices = SliceParams(net->params());
-
-  std::unordered_map<int, shared_ptr<NeuralNet>> grp_net;
-  int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
-  for (auto worker : workers) {
-    int grp_id = worker->grp_id();
-    int worker_id = worker->id();
-    shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr;
-    if (grp_net.find(grp_id) == grp_net.end()) {
-      if (grp_id == first_grp) {
-        //  test are performed only by the first group now. TODO update.
-        if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
-          test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp
-          test_net->ShareParamsFrom(net);
-        }
-        //  validation are performed only by the first group. TODO update.
-        if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
-          valid_net = NeuralNet::Create(net_conf, kValidation, 1);
-          valid_net->ShareParamsFrom(net);
-        }
-        grp_net[grp_id] = net;
-      } else {
-        grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
-        if(cluster->share_memory())
-          grp_net[grp_id]->ShareParamsFrom(net);
-      }
-      for (auto layer : grp_net[grp_id]->layers()) {
-        bool local = layer->partition_id() >= workers.front()->id()
-          && layer->partition_id() <= workers.back()->id();
-        for (auto param : layer->GetParams()) {
-          int hash = Hash(grp_id, param->owner());
-          if (worker_shard_.find(hash) == worker_shard_.end())
-            worker_shard_[hash] = new ParamEntry();
-          worker_shard_[hash]->AddParam(local, param);
-        }
-      }
-    }
-    LOG(INFO) << "grp " << worker->grp_id() << ", worker "
-      << worker->id() << " net " << grp_net[grp_id].get();
-    worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
-  }
-
-  //  partition among server groups, each group maintains one sub-set for sync
-  auto slice2group = PartitionSlices(cluster->nserver_groups(), slices);
-  for (auto server : servers)
-    server->Setup(job_conf.updater(), &server_shard_, slice2group);
-  //  partition within one server group, each server updates for one sub-set
-  slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices);
-}
-
-vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) {
-  auto cluster = Cluster::Get();
-  vector<Server*> servers;
-  if (!cluster->has_server())
-    return servers;
-
-  int pid = cluster->procs_id();
-  // if true, server procs (logical) id starts after worker procs
-  if (cluster->server_worker_separate())
-    pid -= cluster->nworker_procs();
-  int procs_size = cluster->nservers_per_procs();
-  int grp_size = cluster->nservers_per_group();
-  int gid = pid *  procs_size / grp_size;
-  int start = pid * procs_size % grp_size;
-  int end = start + procs_size;
-  for (int sid = start; sid < end; sid++) {
-    auto server = new Server(nthreads++, gid, sid);
-    servers.push_back(server);
-  }
-  return servers;
-}
-
-vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
-  auto cluster=Cluster::Get();
-  vector<Worker*> workers;
-  if(!cluster->has_worker())
-    return workers;
-  int pid = cluster->procs_id();
-  int grp_size = cluster->nworkers_per_group();
-  int procs_size = cluster->nworkers_per_procs();
-  int gstart, gend, wstart, wend;
-  if (grp_size >= procs_size) {
-    // all workers in this procs are from the same group
-    gstart = pid * procs_size / grp_size;
-    gend = gstart + 1;
-    wstart = pid * procs_size % grp_size;
-    wend = wstart + procs_size;
-  } else {
-    // there are multiple (complete) groups in this procs.
-    CHECK_EQ(procs_size % grp_size, 0);
-    int groups_per_procs = procs_size / grp_size;
-    gstart = pid * groups_per_procs;
-    gend = (pid+1) * groups_per_procs;
-    wstart = 0;
-    wend = grp_size;
-  }
-  for (int gid = gstart; gid < gend; gid++) {
-    for (int wid = wstart; wid < wend; wid++) {
-      auto *worker = Worker::Create(job);
-      worker->Init(nthreads++,gid, wid);
-      workers.push_back(worker);
-    }
-  }
-  return workers;
-}
-
-void Trainer::Resume(JobProto* jobConf) {
-  tinydir_dir dir;
-  string folder = Cluster::Get()->checkpoint_folder();
-  tinydir_open(&dir, folder.c_str());
-  int latest_step = 0;
-  // there would be multi checkpoint files (from diff workers) for one step
-  vector<string> ck_files;
-  // iterate all files to get the files for the last checkpoint
-  while (dir.has_next) {
-    tinydir_file file;
-    tinydir_readfile(&dir, &file);
-    tinydir_next(&dir);
-    char* ch = strstr(file.name, "step");
-    if (ch == nullptr) {
-      if (file.name[0] != '.')
-        LOG(INFO) << "Irregular file in checkpoint folder: " << file.name;
-      continue;
-    }
-
-    LOG(INFO) << "Add checkpoint file for resume: " << ch;
-    int step = atoi(ch+4);
-    if (step == latest_step) {
-      ck_files.push_back(file.name);
-    } else if(step > latest_step) {
-      latest_step = step;
-      ck_files.clear();
-      ck_files.push_back(string(file.name));
-    }
-  }
-
-  if (latest_step > 0) {
-    jobConf->set_step(latest_step);
-    if (!jobConf->has_reset_param_version())
-      jobConf->set_reset_param_version(false);
-    jobConf->clear_checkpoint_path();
-    for (auto ck_file : ck_files)
-      jobConf->add_checkpoint_path(folder + "/" + ck_file);
-  }
-  tinydir_close(&dir);
-}
-
-void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
-  // register job to zookeeper at the beginning
-  auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
-  if (resume)
-    Resume(job);
-
-  router_ = new Router();
-  router_->Bind(kInprocRouterEndpoint);
-  const string hostip = cluster->hostip();
-  int port = router_->Bind("tcp://" + hostip + ":*");
-  // register endpoint to zookeeper
-  cluster->Register(getpid(), hostip + ":" + std::to_string(port));
-
-  int nthreads = 1;
-  const vector<Worker*> workers = CreateWorkers(nthreads, *job);
-  nthreads += workers.size();
-  const vector<Server*> servers = CreateServers(nthreads, *job);
-  SetupWorkerServer(*job, workers, servers);
-
-#ifdef USE_MPI
-  for (int i = 0; i < nthreads; i++)
-    MPIQueues.push_back(make_shared<SafeQueue>());
-#endif
-  vector<std::thread> threads;
-  for(auto server : servers)
-    threads.push_back(std::thread(&Server::Run, server));
-  for(auto worker : workers)
-    threads.push_back(std::thread(&Worker::Run, worker));
-  Run(workers, servers);
-  for(auto& thread : threads)
-    thread.join();
-  for(auto server : servers)
-    delete server;
-  for(auto worker : workers)
-    delete worker;
-}
-
-inline int bandwidth(int bytes, system_clock::time_point start) {
-  auto now=system_clock::now();
-  auto duration=duration_cast<std::chrono::milliseconds> (now - start);
-  return static_cast<int>(bytes*1000.f/duration.count());
-}
-
-void Trainer::Run(
-    const vector<Worker*>& workers,
-    const vector<Server*>& servers) {
-  int nworkers = workers.size(), nservers = servers.size();
-  auto cluster = Cluster::Get();
-  procs_id_ = cluster->procs_id();
-  LOG(INFO) << "Stub in process " << procs_id_ << " starts";
-
-  // for sync among server groups
-  auto start = std::chrono::system_clock::now();
-  float trans_size = 0.f;  // total size of msg transferred since start time
-  int sync_server_id = 0;
-  int max_bandwidth = cluster->bandwidth();
-  int nserver_grps = cluster->nserver_groups();
-
-  map<int, Dealer*> inter_dealers;  // for sending msg to other procs
-
-  std::queue<Msg*> msg_queue;
-  Poller poll(router_);
-  bool stop=false;
-  while (!stop || !msg_queue.empty()) {
-    if (msg_queue.empty()) {
-      // if the poll time is large, then the poller may not expire
-      // if it is small, then many reminder messages will be sent which may
-      // slow done the process of other request. TODO tune it.
-      auto *sock = poll.Wait(cluster->poll_time());
-      if (poll.Terminated()) {
-        LOG(ERROR) << "Connection broken!";
-        exit(0);
-      } else if (sock == nullptr) {
-        if (nserver_grps > 1 && bandwidth(trans_size, start) < max_bandwidth) {
-          Msg* msg = GenSyncReminderMsg(sync_server_id, servers);
-          router_->Send(&msg) ;
-          sync_server_id = (sync_server_id + 1) % nservers;
-        }
-        continue;
-      }
-      Msg* msg = router_->Receive();
-      msg_queue.push(msg);
-    }
-    Msg* msg = msg_queue.front();
-    msg_queue.pop();
-    int type = msg->type(), dst = msg->dst(), flag = AddrType(dst);
-    if (flag == kStub && (AddrProc(dst) == procs_id_ || AddrGrp(dst) == -1)) {
-      if (type == kConnect) {
-        DeleteMsg(&msg);
-      } else if (type == kMetric) {
-        DisplayMetric(&msg);
-      } else if (type == kStop) {
-        int src_flag = AddrType(msg->src());
-        if (src_flag == kServer) nservers--;
-        else if (src_flag == kWorkerParam) nworkers--;
-        DeleteMsg(&msg);
-        if (nworkers == 0 && nservers == 0) break;
-      } else if (nserver_grps > 0) {
-        HandleLocalMsg(&msg_queue, &msg);
-      } else {
-        DeleteMsg(&msg);
-      }
-    } else {
-      int dst_procs = AddrProc(dst);
-      if (flag != kStub)
-        dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag);
-      if (dst_procs != procs_id_) {
-        if (bandwidth(trans_size, start) <= cluster->bandwidth()) {
-          start = std::chrono::system_clock::now();
-          trans_size = 0;
-        }
-        trans_size += msg->size();
-
-        if (inter_dealers.find(dst_procs) == inter_dealers.end())
-          inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs);
-        inter_dealers[dst_procs]->Send(&msg);
-      } else {
-        if (type == kSyncRequest)
-          msg->AddFormatFrame("i", max_bandwidth - bandwidth(trans_size, start));
-        router_->Send(&msg);
-      }
-    }
-  }
-  LOG(ERROR) << "Stub in process " << procs_id_ << " stops";
-  for (auto& entry : inter_dealers)
-    delete entry.second;
-}
-
-Msg* Trainer::GenSyncReminderMsg(int server, const vector<Server*>& servers ) {
-  Msg* msg = new Msg();
-  msg->set_src(Addr(-1,-1, kStub));
-  msg->set_dst(Addr(servers[server]->grp_id(), servers[server]->id(), kServer));
-  msg->set_type(kSyncReminder);
-  return msg;
-}
-
-void Trainer::DisplayMetric(Msg** msg) {
-  Msg* msgg = *msg;
-  // only display metrics from the first group
-  if (AddrGrp(msgg->src()) == 0) {
-    int step = msgg->trgt_version();
-    char prefix[128];
-    msgg->ParseFormatFrame("s", prefix);
-    CHECK(msgg->NextFrame());
-    const string perf(static_cast<char*>(msgg->FrameData()), msgg->FrameSize());
-    Metric cur(perf);
-    LOG(ERROR) << prefix << " step-" << step <<", " << cur.ToLogString();
-  }
-  DeleteMsg(msg);
-}
-
-Dealer* Trainer::CreateInterProcsDealer(int dst_procs) {
-  // forward to other procs
-  auto cluster = Cluster::Get();
-  auto dealer = new Dealer();
-  while(cluster->endpoint(dst_procs)=="") {
-    //kCollectSleepTime));
-    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
-    LOG(ERROR)<<"waiting for procs "<< dst_procs<<" to register";
-  }
-  dealer->Connect("tcp://"+cluster->endpoint(dst_procs));
-  return dealer;
-}
-
-void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) {
-  Msg* msgg = *msg;
-  int paramid = ParamID(msgg->trgt_val());
-  int type = msgg->type();
-  int grp;
-  ParamEntry *entry = nullptr;
-  switch (type) {  // TODO process other requests, e.g. RESTful
-    case kUpdate:
-      grp = AddrGrp(msgg->src());
-      entry = worker_shard_.at(Hash(grp, paramid));
-      for(auto update_msg : HandleUpdate(entry, msg))
-        msg_queue->push(update_msg);
-      break;
-    case kRUpdate:
-      grp = AddrGrp(msgg->dst());
-      entry = worker_shard_.at(Hash(grp, paramid));
-      HandleUpdateResponse(entry, msg);
-      break;
-    case kGet:
-      grp = AddrGrp(msgg->src());
-      entry = worker_shard_.at(Hash(grp, paramid));
-      for(auto get_msg : HandleGet(entry, msg))
-        msg_queue->push(get_msg);
-      break;
-    case kRGet:
-      grp = AddrGrp(msgg->dst());
-      entry = worker_shard_.at(Hash(grp, paramid));
-      HandleGetResponse(entry, msg);
-      break;
-    case kPut:
-      grp = AddrGrp(msgg->src());
-      entry = worker_shard_.at(Hash(grp, paramid));
-      for(auto put_msg : HandlePut(entry, msg))
-        msg_queue->push(put_msg);
-      break;
-    default:
-      LOG(ERROR)<<"Unknow message type:"<<type;
-      break;
-  }
-}
-
-void Trainer::GenMsgs(int type, int version, ParamEntry* entry,
-    Msg* msg, vector<Msg*> *ret) {
-  int src_grp = AddrGrp(msg->src());
-  int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group();
-  auto param=entry->shares.at(0);
-  for (int idx = 0 ; idx < param->num_slices(); idx++) {
-    int slice_id =param->slice_start() + idx;
-    int server = slice2server_[slice_id];
-    int procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer);
-    Msg* new_msg = nullptr;
-    if (type == kPut) {
-      CHECK_GT(entry->num_total, 0);
-      new_msg = param->GenPutMsg(procs != procs_id_, idx);
-      // new_msg = param->GenPutMsg(true, idx);
-      new_msg->AddFormatFrame("i", entry->num_total);
-    } else if (type == kGet) {
-      new_msg = param->GenGetMsg(procs != procs_id_, idx);
-      // new_msg = param->GenGetMsg(true, idx);
-    } else if (type == kUpdate) {
-      new_msg = param->GenUpdateMsg(procs != procs_id_, idx);
-      // new_msg = param->GenUpdateMsg(true, idx);
-      new_msg->AddFormatFrame("i", entry->num_local);
-    } else {
-      LOG(FATAL) << "Wrong type";
-    }
-    new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version);
-    new_msg->set_src(Addr(src_grp, procs_id_, kStub));
-    new_msg->set_dst(Addr(dst_grp, server, kServer));
-    ret->push_back(new_msg);
-  }
-}
-
-const vector<Msg*> Trainer::HandleGet(ParamEntry* entry, Msg** msg) {
-  vector<Msg*> ret;
-  int version = (*msg)->trgt_version();
-  if (version > entry->next_version) {
-    entry->next_version = version;
-    GenMsgs(kGet, version, entry, *msg, &ret);
-  }
-  DeleteMsg(msg);
-  return ret;
-}
-
-const vector<Msg*> Trainer::HandleUpdate(ParamEntry *entry, Msg** msg) {
-  vector<Msg*> ret;
-  entry->num_update++;
-  if (entry->num_update >= entry->num_local) {
-    // average local gradient
-    if (entry->num_local > 1) {
-      auto it = entry->shares.begin();
-      float* sum = (*it)->mutable_grad()->mutable_cpu_data();
-      for (++it; it != entry->shares.end(); it++) {
-        float* grad = (*it)->mutable_grad()->mutable_cpu_data();
-        for (int i = 0; i < (*it)->size(); i++) {
-          sum[i] += grad[i];
-        }
-      }
-    }
-    int step = (*msg)->trgt_version();
-    GenMsgs(kUpdate, step, entry, *msg, &ret);
-    entry->num_update = 0;
-  }
-  DeleteMsg(msg);
-  return ret;
-}
-
-const vector<Msg*> Trainer::HandlePut(ParamEntry* entry, Msg** msg) {
-  vector<Msg*> ret;
-  int version = (*msg)->trgt_version();
-  GenMsgs(kPut, version, entry, *msg, &ret);
-  DeleteMsg(msg);
-  return ret;
-}
-
-void Trainer::HandleGetResponse(ParamEntry* entry, Msg** msg) {
-  int version = (*msg)->trgt_version();
-  int sliceid = SliceID((*msg)->trgt_val());
-  auto param = entry->shares.at(0);
-  if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start()))
-    param->set_version(version);
-  DeleteMsg(msg);
-}
-
-void Trainer::HandleUpdateResponse(ParamEntry* entry, Msg** msg) {
-  int version = (*msg)->trgt_version();
-  int sliceid = SliceID((*msg)->trgt_val());
-  auto param = entry->shares.at(0);
-  if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start()))
-    param->set_version(version);
-  DeleteMsg(msg);
-}
-} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/utils/blob.cc
----------------------------------------------------------------------
diff --git a/src/utils/blob.cc b/src/utils/blob.cc
index b27f7db..4a9d681 100644
--- a/src/utils/blob.cc
+++ b/src/utils/blob.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -192,7 +192,6 @@ void SyncedMemory::to_gpu() {
   case UNINITIALIZED:
     CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
     //CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N));
-	//
     CUDA_CHECK(cudaMemset(gpu_ptr_, 0, size_));
     head_ = HEAD_AT_GPU;
     break;