You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/09/29 05:23:56 UTC
[5/5] incubator-singa git commit: SINGA-41:Support single node single
GPU training
SINGA-41:Support single node single GPU training
Rebase to version 0.1.0.
Tested with cifar example.
-G compliation option degrades the performance.
Speed is similar to CConvolution+CPooling on CPU.
Need to optimized the speed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/9dbdfd68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/9dbdfd68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/9dbdfd68
Branch: refs/heads/gpu
Commit: 9dbdfd68695e8d3a30cafc14d941a36ea0bf55d6
Parents: 1770377
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Tue Sep 29 11:18:33 2015 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Tue Sep 29 11:23:21 2015 +0800
----------------------------------------------------------------------
examples/cifar10/job.conf | 12 +-
include/utils/blob.h | 14 +-
src/neuralnet/connection_layer.cc | 6 +-
src/neuralnet/neuron_layer.cu | 203 ++++++-------
src/trainer/server.cc | 256 ----------------
src/trainer/trainer.cc | 521 ---------------------------------
src/utils/blob.cc | 5 +-
7 files changed, 120 insertions(+), 897 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index 343d969..91688a4 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -57,7 +57,7 @@ neuralnet {
layer {
name: "conv1"
- type: kCConvolution
+ type: kConvolution
srclayers: "rgb"
convolution_conf {
num_filters: 32
@@ -84,7 +84,7 @@ neuralnet {
layer {
name: "pool1"
- type: kCPooling
+ type: kPooling
srclayers: "conv1"
pooling_conf {
pool: MAX
@@ -109,7 +109,7 @@ neuralnet {
}
layer {
name: "conv2"
- type: kCConvolution
+ type: kConvolution
srclayers: "norm1"
convolution_conf {
num_filters: 32
@@ -140,7 +140,7 @@ neuralnet {
}
layer {
name: "pool2"
- type: kCPooling
+ type: kPooling
srclayers: "relu2"
pooling_conf {
pool: AVG
@@ -160,7 +160,7 @@ neuralnet {
}
layer {
name: "conv3"
- type: kCConvolution
+ type: kConvolution
srclayers: "norm2"
convolution_conf {
num_filters: 64
@@ -190,7 +190,7 @@ neuralnet {
}
layer {
name: "pool3"
- type: kCPooling
+ type: kPooling
srclayers: "relu3"
pooling_conf {
pool: AVG
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/include/utils/blob.h
----------------------------------------------------------------------
diff --git a/include/utils/blob.h b/include/utils/blob.h
index 903845d..754abbe 100644
--- a/include/utils/blob.h
+++ b/include/utils/blob.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
-*
+*
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -184,11 +184,11 @@ class Blob {
}
inline Dtype* mutable_xpu_data() {
CHECK(data_);
- #ifndef CPU_ONLY
- return static_cast<Dtype*>(data_->mutable_gpu_data());
- #else
- return static_cast<Dtype*>(data_->mutable_cpu_data());
- #endif
+ #ifndef CPU_ONLY
+ return static_cast<Dtype*>(data_->mutable_gpu_data());
+ #else
+ return static_cast<Dtype*>(data_->mutable_cpu_data());
+ #endif
}
/// @brief Compute the sum of absolute values (L1 norm) of the data.
Dtype asum_data() const;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/neuralnet/connection_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/connection_layer.cc b/src/neuralnet/connection_layer.cc
index 750a511..acf243d 100644
--- a/src/neuralnet/connection_layer.cc
+++ b/src/neuralnet/connection_layer.cc
@@ -27,9 +27,9 @@ using std::vector;
/********* Implementation for BridgeDstLayer **************/
void BridgeDstLayer::Setup(const LayerProto& proto,
const vector<Layer*>& srclayers) {
- Layer::Setup(proto, npartitions);
- CHECK_EQ(srclayers_.size(), 1);
- data_.Reshape(srclayers_[0]->data(this).shape());
+ Layer::Setup(proto, srclayers);
+ CHECK_EQ(srclayers.size(), 1);
+ data_.Reshape(srclayers[0]->data(this).shape());
grad_.ReshapeLike(data_);
}
/************* Implementation for ConcateLayer ***********/
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/neuralnet/neuron_layer.cu
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuron_layer.cu b/src/neuralnet/neuron_layer.cu
index affb02f..a93ceb9 100644
--- a/src/neuralnet/neuron_layer.cu
+++ b/src/neuralnet/neuron_layer.cu
@@ -23,118 +23,120 @@
#include <glog/logging.h>
#include <algorithm>
+#include <string>
+#include <vector>
#include "utils/singleton.h"
#include "mshadow/tensor.h"
#include "mshadow/cxxnet_op.h"
namespace singa {
- using namespace mshadow;
- using namespace mshadow::expr;
- using mshadow::cpu;
- using mshadow::xpu;
-
- using mshadow::Shape;
- using mshadow::Shape1;
- using mshadow::Shape2;
- using mshadow::Shape3;
- using mshadow::Shape4;
- using mshadow::Tensor;
-
- using std::string;
- using std::vector;
-
- inline Tensor<cpu, 4> Tensor4CPU(Blob<float>* blob) {
- const vector<int>& shape = blob->shape();
- Tensor<cpu, 4> tensor(blob->mutable_cpu_data(),
- Shape4(shape[0], shape[1], shape[2], shape[3]));
- return tensor;
- }
+using namespace mshadow;
+using namespace mshadow::expr;
+using mshadow::cpu;
+using mshadow::xpu;
- inline Tensor<cpu, 3> Tensor3CPU(Blob<float>* blob) {
- const vector<int>& shape = blob->shape();
- Tensor<cpu, 3> tensor(blob->mutable_cpu_data(),
- Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
- return tensor;
- }
+using mshadow::Shape;
+using mshadow::Shape1;
+using mshadow::Shape2;
+using mshadow::Shape3;
+using mshadow::Shape4;
+using mshadow::Tensor;
- inline Tensor<cpu, 2> Tensor2CPU(Blob<float>* blob) {
- const vector<int>& shape = blob->shape();
- Tensor<cpu, 2> tensor(blob->mutable_cpu_data(),
- Shape2(shape[0], blob->count() / shape[0]));
- return tensor;
- }
+using std::string;
+using std::vector;
- inline Tensor<cpu, 1> Tensor1CPU(Blob<float>* blob) {
- Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count()));
- return tensor;
- }
+inline Tensor<cpu, 4> Tensor4CPU(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<cpu, 4> tensor(blob->mutable_cpu_data(),
+ Shape4(shape[0], shape[1], shape[2], shape[3]));
+ return tensor;
+}
- inline Tensor<xpu, 4> Tensor4(Blob<float>* blob) {
- const vector<int>& shape = blob->shape();
- Tensor<xpu, 4> tensor(blob->mutable_xpu_data(),
- Shape4(shape[0], shape[1], shape[2], shape[3]));
- return tensor;
- }
+inline Tensor<cpu, 3> Tensor3CPU(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<cpu, 3> tensor(blob->mutable_cpu_data(),
+ Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
+ return tensor;
+}
- inline Tensor<xpu, 3> Tensor3(Blob<float>* blob){
- const vector<int>& shape = blob->shape();
- Tensor<xpu, 3> tensor(blob->mutable_xpu_data(),
- Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
- return tensor;
- }
- inline Tensor<xpu, 2> Tensor2(Blob<float>* blob){
- const vector<int>& shape = blob->shape();
- Tensor<xpu, 2> tensor(blob->mutable_xpu_data(),
- Shape2(shape[0], blob->count() / shape[0]));
- return tensor;
- }
- inline Tensor<xpu, 1> Tensor1(Blob<float>* blob){
- Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count()));
- return tensor;
- }
+inline Tensor<cpu, 2> Tensor2CPU(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<cpu, 2> tensor(blob->mutable_cpu_data(),
+ Shape2(shape[0], blob->count() / shape[0]));
+ return tensor;
+}
- /************ Implementation for ConvolutionLayer*************************/
- ConvolutionLayer::~ConvolutionLayer() {
- delete weight_;
- delete bias_;
- }
- void ConvolutionLayer::Setup(const LayerProto& conf,
- const vector<Layer*>& srclayers) {
- CHECK_EQ(srclayers.size(), 1);
- Layer::Setup(conf, srclayers);
- ConvolutionProto conv_conf = conf.convolution_conf();
- kernel_ = conv_conf.kernel();
- CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
- pad_ = conv_conf.pad();
- stride_ = conv_conf.stride();
- num_filters_ = conv_conf.num_filters();
- if (partition_dim() > 0)
- num_filters_ /= srclayers.at(0)->num_partitions();
- const vector<int>& srcshape = srclayers[0]->data(this).shape();
- int dim = srcshape.size();
- CHECK_GT(dim, 2);
- width_ = srcshape[dim - 1];
- height_ = srcshape[dim - 2];
- if (dim > 3)
- channels_ = srcshape[dim - 3];
- else if (dim > 2)
- channels_ = 1;
- batchsize_ = srcshape[0];
- conv_height_ = (height_ + 2 * pad_ - kernel_) / stride_ + 1;
- conv_width_ = (width_ + 2 * pad_ - kernel_) / stride_ + 1;
- col_height_ = channels_ * kernel_ * kernel_;
- col_width_ = conv_height_ * conv_width_;
- vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_};
- data_.Reshape(shape);
- grad_.Reshape(shape);
- col_data_.Reshape(vector<int>{col_height_, col_width_});
- col_grad_.Reshape(vector<int>{col_height_, col_width_});
- weight_ = Param::Create(conf.param(0));
- bias_ = Param::Create(conf.param(1));
- weight_->Setup(vector<int>{num_filters_, col_height_});
- bias_->Setup(vector<int>{num_filters_});
- }
+inline Tensor<cpu, 1> Tensor1CPU(Blob<float>* blob) {
+ Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count()));
+ return tensor;
+}
+
+inline Tensor<xpu, 4> Tensor4(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<xpu, 4> tensor(blob->mutable_xpu_data(),
+ Shape4(shape[0], shape[1], shape[2], shape[3]));
+ return tensor;
+}
+
+inline Tensor<xpu, 3> Tensor3(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<xpu, 3> tensor(blob->mutable_xpu_data(),
+ Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
+ return tensor;
+}
+inline Tensor<xpu, 2> Tensor2(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<xpu, 2> tensor(blob->mutable_xpu_data(),
+ Shape2(shape[0], blob->count() / shape[0]));
+ return tensor;
+}
+inline Tensor<xpu, 1> Tensor1(Blob<float>* blob) {
+ Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count()));
+ return tensor;
+}
+
+/************ Implementation for ConvolutionLayer*************************/
+ConvolutionLayer::~ConvolutionLayer() {
+ delete weight_;
+ delete bias_;
+}
+void ConvolutionLayer::Setup(const LayerProto& conf,
+ const vector<Layer*>& srclayers) {
+ CHECK_EQ(srclayers.size(), 1);
+ Layer::Setup(conf, srclayers);
+ ConvolutionProto conv_conf = conf.convolution_conf();
+ kernel_ = conv_conf.kernel();
+ CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
+ pad_ = conv_conf.pad();
+ stride_ = conv_conf.stride();
+ num_filters_ = conv_conf.num_filters();
+ if (partition_dim() > 0)
+ num_filters_ /= srclayers.at(0)->num_partitions();
+ const vector<int>& srcshape = srclayers[0]->data(this).shape();
+ int dim = srcshape.size();
+ CHECK_GT(dim, 2);
+ width_ = srcshape[dim - 1];
+ height_ = srcshape[dim - 2];
+ if (dim > 3)
+ channels_ = srcshape[dim - 3];
+ else if (dim > 2)
+ channels_ = 1;
+ batchsize_ = srcshape[0];
+ conv_height_ = (height_ + 2 * pad_ - kernel_) / stride_ + 1;
+ conv_width_ = (width_ + 2 * pad_ - kernel_) / stride_ + 1;
+ col_height_ = channels_ * kernel_ * kernel_;
+ col_width_ = conv_height_ * conv_width_;
+ vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_};
+ data_.Reshape(shape);
+ grad_.Reshape(shape);
+ col_data_.Reshape(vector<int>{col_height_, col_width_});
+ col_grad_.Reshape(vector<int>{col_height_, col_width_});
+ weight_ = Param::Create(conf.param(0));
+ bias_ = Param::Create(conf.param(1));
+ weight_->Setup(vector<int>{num_filters_, col_height_});
+ bias_->Setup(vector<int>{num_filters_});
+}
void ConvolutionLayer::ComputeFeature(int flag,
const vector<Layer*>& srclayers) {
@@ -184,7 +186,6 @@ void ConvolutionLayer::ComputeGradient(int flag,
imgshp);
}
}
- // weight_->mutable_data()->mutable_cpu_data();
}
/******************* Implementation for CConvolutionLayer *********/
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
deleted file mode 100644
index f5a0560..0000000
--- a/src/trainer/server.cc
+++ /dev/null
@@ -1,256 +0,0 @@
-#include <thread>
-#include <chrono>
-#include "mshadow/tensor.h"
-#include "trainer/server.h"
-#include "utils/param.h"
-#include "utils/singleton.h"
-#include "utils/factory.h"
-#include "utils/cluster.h"
-#include "proto/common.pb.h"
-
-namespace singa {
-
-using namespace mshadow;
-using std::vector;
-
-Server::Server(int thread_id,int group_id, int server_id):
- thread_id_(thread_id),grp_id_(group_id), id_(server_id){
-}
-
-void Server::Setup(const UpdaterProto& proto,
- std::unordered_map<int, ParamEntry*>* shard,
- const vector<int>& slice2group) {
- updater_ = Updater::Create(proto);
- shard_ = shard;
- slice2group_ = slice2group;
-}
-
-Server::~Server() {
- delete updater_;
-}
-
-void Stop(void * running) {
- *static_cast<bool *>(running) = false;
-}
-
-void Server::Run() {
- LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start";
- auto dealer = new Dealer(2*thread_id_);
- CHECK(dealer->Connect(kInprocRouterEndpoint));
- Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
- ping->set_type(kConnect);
- dealer->Send(&ping);
-
- auto cluster = Cluster::Get();
- bool running = true;
- CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
-
- int nserver_grps = cluster->nserver_groups();
- vector<Param*> master_params;
- size_t syncEntry=0;
- Poller poll(dealer);
- // start recv loop and process requests
- while (running) {
- auto *sock = poll.Wait(cluster->poll_time());
- if (poll.Terminated()) {
- LOG(ERROR) << "Connection broken!";
- exit(0);
- } else if (sock == nullptr) {
- continue;
- }
- Msg* msg=dealer->Receive();
- if (msg==nullptr) break;
- Msg* response=nullptr;
- int type=msg->type();
- int slice_id = SliceID(msg->trgt_val());
- if (type == kPut) {
- response = HandlePut(&msg);
- if(slice2group_[slice_id] == grp_id_)
- master_params.push_back(shard_->at(slice_id)->shares.at(0));
- } else {
- if (shard_->find(slice_id) == shard_->end()) {
- // delay the processing by re-queue the msg.
- response = msg;
- } else if (type == kSyncReminder) {
- DeleteMsg(&msg);
- if(syncEntry >= master_params.size())
- continue;
- auto param = master_params.at(syncEntry);
- // control the frequency of synchronization
- // currently sync is triggerred only when the slice is updated
- // by local worker or other workers for at least nserver_groups times.
- // TODO may optimize the trigger condition.
- if (abs(param->local_version() - param->version()) >= nserver_grps) {
- for (auto msg : GenSyncMsgs(param))
- dealer->Send(&msg);
- syncEntry = (syncEntry+1) % master_params.size();
- }
- } else {
- switch (type) {
- case kGet:
- response = HandleGet(&msg);
- break;
- case kUpdate:
- for (auto reply : HandleUpdate(&msg))
- dealer->Send(&reply);
- break;
- case kSyncRequest:
- response = HandleSyncRequest(&msg);
- break;
- default:
- LOG(ERROR)<<"Unknown message type "<<type;
- break;
- }
- }
- }
- if (response != nullptr)
- dealer->Send(&response);
- }
-
- // send stop msg to stub
- Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
- msg->set_type(kStop);
- dealer->Send(&msg);
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-
- LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
- delete dealer;
-}
-
-const vector<Msg*> Server::GenSyncMsgs(Param* param) {
- vector<Msg*> ret;
- // TODO replace the argument (0,0) to sync a chunk instead of a slice
- auto msg = param->GenSyncMsg(0, 0);
- auto cluster = Cluster::Get();
- for (int i = 0; i < cluster->nserver_groups(); i++) {
- if (i != grp_id_) {
- Msg* tmp = msg;
- if (i < cluster->nserver_groups() - 1)
- tmp = new Msg(*msg);
- // assume only one server per group, TODO generalize it
- tmp->set_dst(Addr(i, 0, kServer));
- tmp->set_src(Addr(grp_id_, id_, kServer));
- ret.push_back(tmp);
- param->set_version(param->local_version());
- //LOG(ERROR)<<"sync slice="<<param->id()<<" to procs "<<i;
- }
- }
- return ret;
-}
-
-Msg* Server::HandlePut(Msg **msg) {
- int version = (*msg)->trgt_version();
- int slice_id = SliceID((*msg)->trgt_val());
- if (shard_->find(slice_id) != shard_->end())
- LOG(FATAL) << "Param (" << slice_id << ") is put more than once";
-
- // TODO(wangwei) replace hard coded param type 0
- auto param = Singleton<Factory<Param>>::Instance()->Create(0);
- auto response = param->HandlePutMsg(msg, true);
- // parse num of shares of this param from a worker group
- int num_shares = 1;
- if ((*msg)->NextFrame())
- (*msg)->ParseFormatFrame("i", &num_shares);
- DeleteMsg(msg);
- (*shard_)[slice_id] = new ParamEntry(num_shares, param);
- // must set version after HandlePutMsg which allocates the memory
- param->set_version(version);
- param->set_local_version(version);
- param->set_id(slice_id);
- //LOG(ERROR)<<"put norm "<<param->data().asum_data()<<", "<<pid;
- // allocate blob for param sync between groups.
- if (Cluster::Get()->nserver_groups() > 1 && slice2group_[slice_id] != grp_id_) {
- last_data_[slice_id] = std::make_shared<Blob<float>>();
- last_data_[slice_id]->ReshapeLike(param->data());
- last_data_[slice_id]->CopyFrom(param->data());
- }
- LOG(INFO)<<"server (group = " << grp_id_ << ", id = " << id_ <<") put slice="
- << slice_id << " size=" << param->size();
- return response;
-}
-
-Msg* Server::HandleGet(Msg **msg) {
- int val = (*msg)->trgt_val();
- auto param = shard_->at(SliceID(val))->shares.at(0);
- // re-queue the request if the param is not updated to the required version
- if(param->version()<(*msg)->trgt_version())
- return *msg;
- else {
- // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first();
- auto reply = param->HandleGetMsg(msg, false);
- reply->set_trgt(val, param->version());
- return reply;
- }
-}
-
-const vector<Msg*> Server::HandleUpdate(Msg **msg) {
- vector<Msg*> ret;
- int sliceid = SliceID((*msg)->trgt_val());
- auto entry = shard_->at(sliceid);
- buffer_requests_[sliceid].push_back(*msg);
- int num_update;
- (*msg)->LastFrame();
- (*msg)->ParseFormatFrame("i", &num_update);
- (*msg)->FirstFrame();
- entry->num_update += num_update;
- // LOG(ERROR) << "update "<<sliceid<< " from "<<(*msg)->src_second()
- // << ", " << num_update << " total " << entry->num_total;
- // do update until recv gradients from all shares of this param/slice
- if (entry->num_update >= entry->num_total) {
- CHECK_EQ(entry->num_update, entry->num_total);
- auto& request = buffer_requests_.at(sliceid);
- int step = (*msg)->trgt_version();
- auto param = entry->shares.at(0);
- // extract and aggregate gradients
- param->ParseUpdateMsgs(request);
- updater_->Update(step, param, 1.0f / entry->num_total);
- param->set_local_version(param->local_version() + 1);
- // response to all shares of this param
- for (auto response : param->GenUpdateResponseMsgs(&request, false)) {
- response->set_trgt((*msg)->trgt_val(), param->local_version());
- ret.push_back(response);
- }
- entry->num_update = 0;
- }
- *msg = nullptr;
- return ret;
-}
-
-Msg* Server::HandleSyncRequest(Msg **msg) {
- Msg* msgg = *msg;
- int slice = SliceID(msgg->trgt_val());
- auto param = shard_->at(slice)->shares.at(0);
- Msg* response=nullptr;
- auto shape=Shape1(param->size());
- CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float));
- Tensor<cpu, 1> tmp(static_cast<float*>(msgg->FrameData()), shape);
- Tensor<cpu, 1> cur(param->mutable_data()->mutable_cpu_data(), shape);
- //LOG(ERROR)<<"Recv sync for "<<param->id();
- if (slice2group_[slice] == grp_id_) {
- // recv sync msg on slice I am mastering
- cur+=tmp;
- param->set_local_version(param->local_version()+1);
- } else { // recv sync msg on slice mastered by others
- TensorContainer<cpu, 1> diff(shape);
- Tensor<cpu, 1> prev(last_data_[param->id()]->mutable_cpu_data(), shape);
- diff=cur-prev;
- msgg->NextFrame();
- int bandwidth;
- msgg->ParseFormatFrame("i", &bandwidth);
- if (bandwidth > 0) {
- // send back my updates to the server group mastering this param
- response=new Msg(msgg->dst(), msgg->src());
- response->set_type(kSyncRequest);
- response->set_trgt(param->id(), param->version());
- response->AddFrame(diff.dptr, param->size()*sizeof(float));
- prev=diff+tmp;
- Copy(cur, prev);
- } else { // no bandwidth, aggregate my updates for next sync
- Copy(prev, tmp);
- cur=tmp+diff;
- }
- }
- DeleteMsg(msg);
- return response;
-}
-} /* singa */
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
deleted file mode 100644
index c62e0d1..0000000
--- a/src/trainer/trainer.cc
+++ /dev/null
@@ -1,521 +0,0 @@
-#include <thread>
-#include <vector>
-#include <map>
-#include <chrono>
-#include <glog/logging.h>
-#include "utils/tinydir.h"
-#include <unistd.h>
-#include "utils/cluster.h"
-#include "utils/common.h"
-#include "proto/common.pb.h"
-#include "trainer/trainer.h"
-
-
-namespace singa {
-using std::vector;
-using std::map;
-using std::queue;
-using namespace std::chrono;
-using std::make_shared;
-
-/***********************Trainer****************************/
-Trainer::~Trainer() {
- // free Params (i.e., slices) in server shard
- for (auto entry : server_shard_)
- for (auto param : entry.second->shares)
- delete param;
- delete router_;
-}
-
-const vector<int> SliceParams(const vector<Param*>& params) {
- // for load-balance among servers in a group and among server groups
- int nserver_grps = Cluster::Get()->nserver_groups();
- int nservers_per_grp = Cluster::Get()->nservers_per_group();
- int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp);
-
- // collect sizes of unique Params
- std::vector<int> paramsize;
- for (auto param : params)
- if (param->id() == param->owner())
- paramsize.push_back(param->size());
- // slice into lcm pieces to achieve good load-balance for both intra-group
- // partition (among servers in a group) and inter-group partition (each group
- // is assgined a sub-set of slices)
- auto param_slice = Slice(lcm, paramsize);
- // construct map from Param ID to its slices <slice id, len>
- std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
- vector<int> slices;
- auto it = param_slice.begin();
- int slice_id = 0;
- for (auto param : params) {
- if (param->id() == param->owner()) {
- for (int len : *it) {
- slices.push_back(len);
- paramid2slices[param->id()].push_back(std::make_pair(slice_id++, len));
- }
- it++;
- }
- }
- // add slice info for every Param
- for (auto param : params)
- for (auto entry : paramid2slices[param->owner()]) {
- param->AddSlice(entry.first, entry.second);
- LOG(INFO) << "param id " << param->id() << " owner=" << param->owner()
- << ": " << entry.first << ", " << entry.second;
- }
- return slices;
-}
-
-void Trainer::SetupWorkerServer(
- const JobProto& job_conf,
- const vector<Worker*>& workers,
- const vector<Server*>& servers) {
- auto cluster = Cluster::Get();
- int grp_size = cluster->nworkers_per_group();
- const auto& net_conf = job_conf.neuralnet();
- auto net = NeuralNet::Create(net_conf, kTrain, grp_size);
- // MUST do SliceParam before share param/net with others
- auto slices = SliceParams(net->params());
-
- std::unordered_map<int, shared_ptr<NeuralNet>> grp_net;
- int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
- for (auto worker : workers) {
- int grp_id = worker->grp_id();
- int worker_id = worker->id();
- shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr;
- if (grp_net.find(grp_id) == grp_net.end()) {
- if (grp_id == first_grp) {
- // test are performed only by the first group now. TODO update.
- if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
- test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp
- test_net->ShareParamsFrom(net);
- }
- // validation are performed only by the first group. TODO update.
- if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
- valid_net = NeuralNet::Create(net_conf, kValidation, 1);
- valid_net->ShareParamsFrom(net);
- }
- grp_net[grp_id] = net;
- } else {
- grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
- if(cluster->share_memory())
- grp_net[grp_id]->ShareParamsFrom(net);
- }
- for (auto layer : grp_net[grp_id]->layers()) {
- bool local = layer->partition_id() >= workers.front()->id()
- && layer->partition_id() <= workers.back()->id();
- for (auto param : layer->GetParams()) {
- int hash = Hash(grp_id, param->owner());
- if (worker_shard_.find(hash) == worker_shard_.end())
- worker_shard_[hash] = new ParamEntry();
- worker_shard_[hash]->AddParam(local, param);
- }
- }
- }
- LOG(INFO) << "grp " << worker->grp_id() << ", worker "
- << worker->id() << " net " << grp_net[grp_id].get();
- worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
- }
-
- // partition among server groups, each group maintains one sub-set for sync
- auto slice2group = PartitionSlices(cluster->nserver_groups(), slices);
- for (auto server : servers)
- server->Setup(job_conf.updater(), &server_shard_, slice2group);
- // partition within one server group, each server updates for one sub-set
- slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices);
-}
-
-vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) {
- auto cluster = Cluster::Get();
- vector<Server*> servers;
- if (!cluster->has_server())
- return servers;
-
- int pid = cluster->procs_id();
- // if true, server procs (logical) id starts after worker procs
- if (cluster->server_worker_separate())
- pid -= cluster->nworker_procs();
- int procs_size = cluster->nservers_per_procs();
- int grp_size = cluster->nservers_per_group();
- int gid = pid * procs_size / grp_size;
- int start = pid * procs_size % grp_size;
- int end = start + procs_size;
- for (int sid = start; sid < end; sid++) {
- auto server = new Server(nthreads++, gid, sid);
- servers.push_back(server);
- }
- return servers;
-}
-
-vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
- auto cluster=Cluster::Get();
- vector<Worker*> workers;
- if(!cluster->has_worker())
- return workers;
- int pid = cluster->procs_id();
- int grp_size = cluster->nworkers_per_group();
- int procs_size = cluster->nworkers_per_procs();
- int gstart, gend, wstart, wend;
- if (grp_size >= procs_size) {
- // all workers in this procs are from the same group
- gstart = pid * procs_size / grp_size;
- gend = gstart + 1;
- wstart = pid * procs_size % grp_size;
- wend = wstart + procs_size;
- } else {
- // there are multiple (complete) groups in this procs.
- CHECK_EQ(procs_size % grp_size, 0);
- int groups_per_procs = procs_size / grp_size;
- gstart = pid * groups_per_procs;
- gend = (pid+1) * groups_per_procs;
- wstart = 0;
- wend = grp_size;
- }
- for (int gid = gstart; gid < gend; gid++) {
- for (int wid = wstart; wid < wend; wid++) {
- auto *worker = Worker::Create(job);
- worker->Init(nthreads++,gid, wid);
- workers.push_back(worker);
- }
- }
- return workers;
-}
-
-void Trainer::Resume(JobProto* jobConf) {
- tinydir_dir dir;
- string folder = Cluster::Get()->checkpoint_folder();
- tinydir_open(&dir, folder.c_str());
- int latest_step = 0;
- // there would be multi checkpoint files (from diff workers) for one step
- vector<string> ck_files;
- // iterate all files to get the files for the last checkpoint
- while (dir.has_next) {
- tinydir_file file;
- tinydir_readfile(&dir, &file);
- tinydir_next(&dir);
- char* ch = strstr(file.name, "step");
- if (ch == nullptr) {
- if (file.name[0] != '.')
- LOG(INFO) << "Irregular file in checkpoint folder: " << file.name;
- continue;
- }
-
- LOG(INFO) << "Add checkpoint file for resume: " << ch;
- int step = atoi(ch+4);
- if (step == latest_step) {
- ck_files.push_back(file.name);
- } else if(step > latest_step) {
- latest_step = step;
- ck_files.clear();
- ck_files.push_back(string(file.name));
- }
- }
-
- if (latest_step > 0) {
- jobConf->set_step(latest_step);
- if (!jobConf->has_reset_param_version())
- jobConf->set_reset_param_version(false);
- jobConf->clear_checkpoint_path();
- for (auto ck_file : ck_files)
- jobConf->add_checkpoint_path(folder + "/" + ck_file);
- }
- tinydir_close(&dir);
-}
-
-void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
- // register job to zookeeper at the beginning
- auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
- if (resume)
- Resume(job);
-
- router_ = new Router();
- router_->Bind(kInprocRouterEndpoint);
- const string hostip = cluster->hostip();
- int port = router_->Bind("tcp://" + hostip + ":*");
- // register endpoint to zookeeper
- cluster->Register(getpid(), hostip + ":" + std::to_string(port));
-
- int nthreads = 1;
- const vector<Worker*> workers = CreateWorkers(nthreads, *job);
- nthreads += workers.size();
- const vector<Server*> servers = CreateServers(nthreads, *job);
- SetupWorkerServer(*job, workers, servers);
-
-#ifdef USE_MPI
- for (int i = 0; i < nthreads; i++)
- MPIQueues.push_back(make_shared<SafeQueue>());
-#endif
- vector<std::thread> threads;
- for(auto server : servers)
- threads.push_back(std::thread(&Server::Run, server));
- for(auto worker : workers)
- threads.push_back(std::thread(&Worker::Run, worker));
- Run(workers, servers);
- for(auto& thread : threads)
- thread.join();
- for(auto server : servers)
- delete server;
- for(auto worker : workers)
- delete worker;
-}
-
-inline int bandwidth(int bytes, system_clock::time_point start) {
- auto now=system_clock::now();
- auto duration=duration_cast<std::chrono::milliseconds> (now - start);
- return static_cast<int>(bytes*1000.f/duration.count());
-}
-
-void Trainer::Run(
- const vector<Worker*>& workers,
- const vector<Server*>& servers) {
- int nworkers = workers.size(), nservers = servers.size();
- auto cluster = Cluster::Get();
- procs_id_ = cluster->procs_id();
- LOG(INFO) << "Stub in process " << procs_id_ << " starts";
-
- // for sync among server groups
- auto start = std::chrono::system_clock::now();
- float trans_size = 0.f; // total size of msg transferred since start time
- int sync_server_id = 0;
- int max_bandwidth = cluster->bandwidth();
- int nserver_grps = cluster->nserver_groups();
-
- map<int, Dealer*> inter_dealers; // for sending msg to other procs
-
- std::queue<Msg*> msg_queue;
- Poller poll(router_);
- bool stop=false;
- while (!stop || !msg_queue.empty()) {
- if (msg_queue.empty()) {
- // if the poll time is large, then the poller may not expire
- // if it is small, then many reminder messages will be sent which may
- // slow done the process of other request. TODO tune it.
- auto *sock = poll.Wait(cluster->poll_time());
- if (poll.Terminated()) {
- LOG(ERROR) << "Connection broken!";
- exit(0);
- } else if (sock == nullptr) {
- if (nserver_grps > 1 && bandwidth(trans_size, start) < max_bandwidth) {
- Msg* msg = GenSyncReminderMsg(sync_server_id, servers);
- router_->Send(&msg) ;
- sync_server_id = (sync_server_id + 1) % nservers;
- }
- continue;
- }
- Msg* msg = router_->Receive();
- msg_queue.push(msg);
- }
- Msg* msg = msg_queue.front();
- msg_queue.pop();
- int type = msg->type(), dst = msg->dst(), flag = AddrType(dst);
- if (flag == kStub && (AddrProc(dst) == procs_id_ || AddrGrp(dst) == -1)) {
- if (type == kConnect) {
- DeleteMsg(&msg);
- } else if (type == kMetric) {
- DisplayMetric(&msg);
- } else if (type == kStop) {
- int src_flag = AddrType(msg->src());
- if (src_flag == kServer) nservers--;
- else if (src_flag == kWorkerParam) nworkers--;
- DeleteMsg(&msg);
- if (nworkers == 0 && nservers == 0) break;
- } else if (nserver_grps > 0) {
- HandleLocalMsg(&msg_queue, &msg);
- } else {
- DeleteMsg(&msg);
- }
- } else {
- int dst_procs = AddrProc(dst);
- if (flag != kStub)
- dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag);
- if (dst_procs != procs_id_) {
- if (bandwidth(trans_size, start) <= cluster->bandwidth()) {
- start = std::chrono::system_clock::now();
- trans_size = 0;
- }
- trans_size += msg->size();
-
- if (inter_dealers.find(dst_procs) == inter_dealers.end())
- inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs);
- inter_dealers[dst_procs]->Send(&msg);
- } else {
- if (type == kSyncRequest)
- msg->AddFormatFrame("i", max_bandwidth - bandwidth(trans_size, start));
- router_->Send(&msg);
- }
- }
- }
- LOG(ERROR) << "Stub in process " << procs_id_ << " stops";
- for (auto& entry : inter_dealers)
- delete entry.second;
-}
-
-Msg* Trainer::GenSyncReminderMsg(int server, const vector<Server*>& servers ) {
- Msg* msg = new Msg();
- msg->set_src(Addr(-1,-1, kStub));
- msg->set_dst(Addr(servers[server]->grp_id(), servers[server]->id(), kServer));
- msg->set_type(kSyncReminder);
- return msg;
-}
-
-void Trainer::DisplayMetric(Msg** msg) {
- Msg* msgg = *msg;
- // only display metrics from the first group
- if (AddrGrp(msgg->src()) == 0) {
- int step = msgg->trgt_version();
- char prefix[128];
- msgg->ParseFormatFrame("s", prefix);
- CHECK(msgg->NextFrame());
- const string perf(static_cast<char*>(msgg->FrameData()), msgg->FrameSize());
- Metric cur(perf);
- LOG(ERROR) << prefix << " step-" << step <<", " << cur.ToLogString();
- }
- DeleteMsg(msg);
-}
-
-Dealer* Trainer::CreateInterProcsDealer(int dst_procs) {
- // forward to other procs
- auto cluster = Cluster::Get();
- auto dealer = new Dealer();
- while(cluster->endpoint(dst_procs)=="") {
- //kCollectSleepTime));
- std::this_thread::sleep_for(std::chrono::milliseconds(3000));
- LOG(ERROR)<<"waiting for procs "<< dst_procs<<" to register";
- }
- dealer->Connect("tcp://"+cluster->endpoint(dst_procs));
- return dealer;
-}
-
-void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) {
- Msg* msgg = *msg;
- int paramid = ParamID(msgg->trgt_val());
- int type = msgg->type();
- int grp;
- ParamEntry *entry = nullptr;
- switch (type) { // TODO process other requests, e.g. RESTful
- case kUpdate:
- grp = AddrGrp(msgg->src());
- entry = worker_shard_.at(Hash(grp, paramid));
- for(auto update_msg : HandleUpdate(entry, msg))
- msg_queue->push(update_msg);
- break;
- case kRUpdate:
- grp = AddrGrp(msgg->dst());
- entry = worker_shard_.at(Hash(grp, paramid));
- HandleUpdateResponse(entry, msg);
- break;
- case kGet:
- grp = AddrGrp(msgg->src());
- entry = worker_shard_.at(Hash(grp, paramid));
- for(auto get_msg : HandleGet(entry, msg))
- msg_queue->push(get_msg);
- break;
- case kRGet:
- grp = AddrGrp(msgg->dst());
- entry = worker_shard_.at(Hash(grp, paramid));
- HandleGetResponse(entry, msg);
- break;
- case kPut:
- grp = AddrGrp(msgg->src());
- entry = worker_shard_.at(Hash(grp, paramid));
- for(auto put_msg : HandlePut(entry, msg))
- msg_queue->push(put_msg);
- break;
- default:
- LOG(ERROR)<<"Unknow message type:"<<type;
- break;
- }
-}
-
-void Trainer::GenMsgs(int type, int version, ParamEntry* entry,
- Msg* msg, vector<Msg*> *ret) {
- int src_grp = AddrGrp(msg->src());
- int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group();
- auto param=entry->shares.at(0);
- for (int idx = 0 ; idx < param->num_slices(); idx++) {
- int slice_id =param->slice_start() + idx;
- int server = slice2server_[slice_id];
- int procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer);
- Msg* new_msg = nullptr;
- if (type == kPut) {
- CHECK_GT(entry->num_total, 0);
- new_msg = param->GenPutMsg(procs != procs_id_, idx);
- // new_msg = param->GenPutMsg(true, idx);
- new_msg->AddFormatFrame("i", entry->num_total);
- } else if (type == kGet) {
- new_msg = param->GenGetMsg(procs != procs_id_, idx);
- // new_msg = param->GenGetMsg(true, idx);
- } else if (type == kUpdate) {
- new_msg = param->GenUpdateMsg(procs != procs_id_, idx);
- // new_msg = param->GenUpdateMsg(true, idx);
- new_msg->AddFormatFrame("i", entry->num_local);
- } else {
- LOG(FATAL) << "Wrong type";
- }
- new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version);
- new_msg->set_src(Addr(src_grp, procs_id_, kStub));
- new_msg->set_dst(Addr(dst_grp, server, kServer));
- ret->push_back(new_msg);
- }
-}
-
-const vector<Msg*> Trainer::HandleGet(ParamEntry* entry, Msg** msg) {
- vector<Msg*> ret;
- int version = (*msg)->trgt_version();
- if (version > entry->next_version) {
- entry->next_version = version;
- GenMsgs(kGet, version, entry, *msg, &ret);
- }
- DeleteMsg(msg);
- return ret;
-}
-
-const vector<Msg*> Trainer::HandleUpdate(ParamEntry *entry, Msg** msg) {
- vector<Msg*> ret;
- entry->num_update++;
- if (entry->num_update >= entry->num_local) {
- // average local gradient
- if (entry->num_local > 1) {
- auto it = entry->shares.begin();
- float* sum = (*it)->mutable_grad()->mutable_cpu_data();
- for (++it; it != entry->shares.end(); it++) {
- float* grad = (*it)->mutable_grad()->mutable_cpu_data();
- for (int i = 0; i < (*it)->size(); i++) {
- sum[i] += grad[i];
- }
- }
- }
- int step = (*msg)->trgt_version();
- GenMsgs(kUpdate, step, entry, *msg, &ret);
- entry->num_update = 0;
- }
- DeleteMsg(msg);
- return ret;
-}
-
-const vector<Msg*> Trainer::HandlePut(ParamEntry* entry, Msg** msg) {
- vector<Msg*> ret;
- int version = (*msg)->trgt_version();
- GenMsgs(kPut, version, entry, *msg, &ret);
- DeleteMsg(msg);
- return ret;
-}
-
-void Trainer::HandleGetResponse(ParamEntry* entry, Msg** msg) {
- int version = (*msg)->trgt_version();
- int sliceid = SliceID((*msg)->trgt_val());
- auto param = entry->shares.at(0);
- if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start()))
- param->set_version(version);
- DeleteMsg(msg);
-}
-
-void Trainer::HandleUpdateResponse(ParamEntry* entry, Msg** msg) {
- int version = (*msg)->trgt_version();
- int sliceid = SliceID((*msg)->trgt_val());
- auto param = entry->shares.at(0);
- if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start()))
- param->set_version(version);
- DeleteMsg(msg);
-}
-} /* singa */
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/utils/blob.cc
----------------------------------------------------------------------
diff --git a/src/utils/blob.cc b/src/utils/blob.cc
index b27f7db..4a9d681 100644
--- a/src/utils/blob.cc
+++ b/src/utils/blob.cc
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
-*
+*
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -192,7 +192,6 @@ void SyncedMemory::to_gpu() {
case UNINITIALIZED:
CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
//CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N));
- //
CUDA_CHECK(cudaMemset(gpu_ptr_, 0, size_));
head_ = HEAD_AT_GPU;
break;