You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/07/11 10:54:00 UTC
[2/3] incubator-singa git commit: SINGA-29 Update NeuralNet class to
enable customizing layer partition type
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/neuralnet/layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc
index db13824..b40b676 100644
--- a/src/neuralnet/layer.cc
+++ b/src/neuralnet/layer.cc
@@ -13,18 +13,47 @@ using namespace mshadow;
using namespace mshadow::expr;
namespace singa {
+inline Tensor<cpu, 4> Tensor4(Blob<float>* blob) {
+ const vector<int>& shape = blob->shape();
+ Tensor<cpu, 4> tensor(blob->mutable_cpu_data(),
+ Shape4(shape[0], shape[1], shape[2], shape[3]));
+ return tensor;
+}
+
+inline Tensor<cpu, 3> Tensor3(Blob<float>* blob){
+ const vector<int>& shape = blob->shape();
+ Tensor<cpu, 3> tensor(blob->mutable_cpu_data(),
+ Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
+ return tensor;
+}
+inline Tensor<cpu, 2> Tensor2(Blob<float>* blob){
+ const vector<int>& shape = blob->shape();
+ Tensor<cpu, 2> tensor(blob->mutable_cpu_data(),
+ Shape2(shape[0], blob->count() / shape[0]));
+ return tensor;
+}
+inline Tensor<cpu, 1> Tensor1(Blob<float>* blob){
+ Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count()));
+ return tensor;
+}
/************ Implementation for ConvProductLayer*************************/
-void ConvolutionLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
+ConvolutionLayer::~ConvolutionLayer() {
+ delete weight_;
+ delete bias_;
+}
+void ConvolutionLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
ConvolutionProto conv_conf=proto.convolution_conf();
kernel_=conv_conf.kernel();
CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
pad_=conv_conf.pad();
stride_=conv_conf.stride();
num_filters_=conv_conf.num_filters();
- const vector<int>& srcshape=srclayers[0]->data(this).shape();
+ if(partition_dim() > 0)
+ num_filters_ /= npartitions;
+
+ const vector<int>& srcshape=srclayers_[0]->data(this).shape();
int dim=srcshape.size();
CHECK_GT(dim, 2);
width_=srcshape[dim-1];
@@ -45,32 +74,18 @@ void ConvolutionLayer::Setup(const LayerProto& proto,
col_grad_.Reshape(vector<int>{col_height_, col_width_});
Factory<Param>* factory=Singleton<Factory<Param>>::Instance();
- weight_=shared_ptr<Param>(factory->Create("Param"));
+ weight_ = factory->Create("Param");
weight_->Setup(proto.param(0), vector<int>{num_filters_, col_height_});
- bias_=shared_ptr<Param>(factory->Create("Param"));
+ bias_ = factory->Create("Param");
bias_->Setup(proto.param(1), vector<int>{num_filters_});
}
-void ConvolutionLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- LayerProto newproto(proto);
- ConvolutionProto *conv_conf=newproto.mutable_convolution_conf();
- conv_conf->set_num_filters(shape[1]);
- Setup(newproto, srclayers);
-}
-
-void ConvolutionLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
- Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape4(batchsize_, channels_, height_, width_));
- Tensor<cpu, 3> data(data_.mutable_cpu_data(),
- Shape3(batchsize_, num_filters_, conv_height_* conv_width_));
- Tensor<cpu, 2> col(col_data_.mutable_cpu_data(),
- Shape2(col_height_, col_width_));
- Tensor<cpu, 2> weight(weight_->mutable_cpu_data(),
- Shape2(num_filters_, col_height_));
- Tensor<cpu, 1> bias(bias_->mutable_cpu_data(),
- Shape1(num_filters_));
+void ConvolutionLayer::ComputeFeature(Phase phase, Metric* perf){
+ auto src = Tensor4(srclayers_[0]->mutable_data(this));
+ auto data = Tensor3(&data_);
+ auto col = Tensor2(&col_data_);
+ auto weight = Tensor2(weight_->mutable_data());
+ auto bias = Tensor1(bias_->mutable_data());
for(int n=0;n<batchsize_;n++){
if(pad_>0)
@@ -82,144 +97,126 @@ void ConvolutionLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclaye
data+=broadcast<1>(bias, data.shape);
}
-void ConvolutionLayer::ComputeGradient(const vector<SLayer>& srclayers) {
- Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape4(batchsize_, channels_, height_, width_));
- Tensor<cpu, 2> col(col_data_.mutable_cpu_data(),
- Shape2(col_height_, col_width_));
- Tensor<cpu, 2> weight(weight_->mutable_cpu_data(),
- Shape2(num_filters_, col_height_));
+void ConvolutionLayer::ComputeGradient(Phase phase) {
+ auto src = Tensor4(srclayers_[0]->mutable_data(this));
+ auto col = Tensor2(&col_data_);
+ auto weight = Tensor2(weight_->mutable_data());
+
+ auto grad = Tensor3(&grad_);
+ auto gcol = Tensor2(&col_grad_);
+ auto gweight = Tensor2(weight_->mutable_grad());
+ auto gbias = Tensor1(bias_->mutable_grad());
- Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this);
+ Blob<float>* gsrcblob=srclayers_[0]->mutable_grad(this);
Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_));
if(gsrcblob!=nullptr)
gsrc.dptr=gsrcblob->mutable_cpu_data();
- Tensor<cpu, 3> grad(grad_.mutable_cpu_data(),
- Shape3(batchsize_, num_filters_, conv_height_* conv_width_));
- Tensor<cpu, 2> gcol(col_grad_.mutable_cpu_data(),
- Shape2(col_height_, col_width_));
- Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(),
- Shape2(num_filters_, col_height_));
- Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(),
- Shape1(num_filters_));
-
- gweight=0.0f;
gbias=sumall_except_dim<1>(grad);
- Shape<3> padshape(gsrc.shape.SubShape());
- padshape[0]+=2*pad_;padshape[1]+=2*pad_;
- Shape<2> imgshape=Shape2(height_, width_);
+
+ gweight = 0.0f;
+ Shape<3> padshp(gsrc.shape.SubShape());
+ padshp[0] += 2 * pad_;
+ padshp[1] += 2 * pad_;
+ Shape<2> imgshp = Shape2(height_, width_);
for(int n=0;n<batchsize_;n++){
if(pad_>0)
col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_);
else
col=unpack_patch2col(src[n], kernel_, stride_);
- gweight+=dot(grad[n], col.T());
+ gweight += dot(grad[n], col.T());
if(gsrcblob!=nullptr){
- gcol=dot(weight.T(), grad[n]);
- gsrc[n]=crop(pack_col2patch(gcol, padshape, kernel_, stride_), imgshape);
+ gcol = dot(weight.T(), grad[n]);
+ gsrc[n] = crop(pack_col2patch(gcol, padshp, kernel_, stride_), imgshp);
}
}
}
/****************** Implementation for DropoutLayer ***********************/
-void DropoutLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- data_.ReshapeLike(srclayers[0]->data(this));
- grad_.ReshapeLike(*srclayers[0]->mutable_grad(this));
- mask_.Reshape(srclayers[0]->data(this).shape());
- pdrop_=proto.dropout_conf().dropout_ratio();
-}
-
-void DropoutLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- Setup(proto, srclayers);
+void DropoutLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
+ data_.ReshapeLike(srclayers_[0]->data(this));
+ grad_.ReshapeLike(*srclayers_[0]->mutable_grad(this));
+ mask_.Reshape(srclayers_[0]->data(this).shape());
+ pdrop_ = proto.dropout_conf().dropout_ratio();
}
-void DropoutLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers) {
+void DropoutLayer::ComputeFeature(Phase phase, Metric* perf) {
// check training
- if(phase!= kTrain){//!training){
- data_.CopyFrom(srclayers[0]->data(this));
+ if(phase != kTrain){//!training){
+ data_.CopyFrom(srclayers_[0]->data(this));
return;
}
float pkeep=1-pdrop_;
- Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count()));
+ auto mask = Tensor1(&mask_);
mask = F<op::threshold>(TSingleton<Random<cpu>>::Instance()\
->uniform(mask.shape), pkeep ) * (1.0f/pkeep);
- Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
- Blob<float>* srcblob=srclayers[0]->mutable_data(this);
- Tensor<cpu, 1> src(srcblob->mutable_cpu_data(), Shape1(srcblob->count()));
- data=src*mask;
-}
-
-void DropoutLayer::ComputeGradient(const vector<SLayer>& srclayers) {
- Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(data_.count()));
- Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count()));
- Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this);
- Tensor<cpu, 1> gsrc(gsrcblob->mutable_cpu_data(), Shape1(gsrcblob->count()));
- gsrc=grad*mask;
-}
-/**************** Implementation for InnerProductLayer********************/
-void InnerProductLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
- const auto& src=srclayers[0]->data(this);
+ auto data = Tensor1(&data_);
+ auto src = Tensor1(srclayers_[0]->mutable_data(this));
+ data = src * mask;
+}
+
+void DropoutLayer::ComputeGradient(Phase phase) {
+ auto mask = Tensor1(&mask_);
+ auto grad = Tensor1(&grad_);
+ auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
+ gsrc = grad * mask;
+}
+
+/*********** Implementation for InnerProductLayer**********/
+InnerProductLayer::~InnerProductLayer() {
+ delete weight_;
+ delete bias_;
+}
+void InnerProductLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(), 1);
+ const auto& src=srclayers_[0]->data(this);
batchsize_=src.shape()[0];
vdim_=src.count()/batchsize_;
hdim_=proto.innerproduct_conf().num_output();
+ if(partition_dim()>0)
+ hdim_ /= npartitions;
data_.Reshape(vector<int>{batchsize_, hdim_});
grad_.ReshapeLike(data_);
Factory<Param>* factory=Singleton<Factory<Param>>::Instance();
- weight_=shared_ptr<Param>(factory->Create("Param"));
- bias_=shared_ptr<Param>(factory->Create("Param"));
+ weight_ = factory->Create("Param");
+ bias_ = factory->Create("Param");
weight_->Setup(proto.param(0), vector<int>{vdim_, hdim_});
bias_->Setup(proto.param(1), vector<int>{hdim_});
}
-void InnerProductLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- LayerProto newproto(proto);
- InnerProductProto * innerproto=newproto.mutable_innerproduct_conf();
- innerproto->set_num_output(shape[1]);
- Setup(newproto, srclayers);
-}
-
-void InnerProductLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers) {
- Tensor<cpu, 2> data(data_.mutable_cpu_data(), Shape2(batchsize_,hdim_));
- CHECK_EQ(srclayers[0]->data(this).count(), batchsize_*vdim_);
- Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape2(batchsize_,vdim_));
- Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_));
- Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), Shape1(hdim_));
+
+void InnerProductLayer::ComputeFeature(Phase phase, Metric* perf) {
+ auto data = Tensor2(&data_);
+ auto src = Tensor2(srclayers_[0]->mutable_data(this));
+ auto weight = Tensor2(weight_->mutable_data());
+ auto bias = Tensor1(bias_->mutable_data());
data=dot(src, weight);
// repmat: repeat bias vector into batchsize rows
data+=repmat(bias, batchsize_);
}
-void InnerProductLayer::ComputeGradient(const vector<SLayer>& srclayers) {
- Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape2(batchsize_,vdim_));
- Tensor<cpu, 2> grad(grad_.mutable_cpu_data(),Shape2(batchsize_,hdim_));
- Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_));
- Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), Shape2(vdim_,hdim_));
- Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), Shape1(hdim_));
+void InnerProductLayer::ComputeGradient(Phase phas) {
+ auto src = Tensor2(srclayers_[0]->mutable_data(this));
+ auto grad = Tensor2(&grad_);
+ auto weight = Tensor2(weight_->mutable_data());
+ auto gweight = Tensor2(weight_->mutable_grad());
+ auto gbias = Tensor1(bias_->mutable_grad());
gbias=sum_rows(grad);
gweight=dot(src.T(), grad);
- if(srclayers[0]->mutable_grad(this)!=nullptr){
- Tensor<cpu, 2> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
- Shape2(batchsize_,vdim_));
+ if(srclayers_[0]->mutable_grad(this)!=nullptr){
+ auto gsrc = Tensor2(srclayers_[0]->mutable_grad(this));
gsrc=dot(grad, weight.T());
}
}
/*****************************************************************************
* Implementation for LabelLayer
*****************************************************************************/
-void LabelLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
- int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+void LabelLayer::Setup(const LayerProto& proto, int npartitions){
+ Layer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(),1);
+ int batchsize=static_cast<DataLayer*>(srclayers_[0])->batchsize();
data_.Reshape(vector<int>{batchsize});
}
@@ -236,7 +233,7 @@ void LabelLayer::ParseRecords(Phase phase, const vector<Record>& records,
/*********************LMDBDataLayer**********************************/
-void LMDBDataLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
+void LMDBDataLayer::ComputeFeature(Phase phase, Metric* perf){
if(random_skip_){
int nskip=rand()%random_skip_;
int n=0;
@@ -296,8 +293,8 @@ void LMDBDataLayer::ConvertDatumToSingleLableImageRecord(const Datum& datum,
}
}
-void LMDBDataLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
+void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
CHECK_EQ(mdb_env_create(&mdb_env_), MDB_SUCCESS) << "mdb_env_create failed";
CHECK_EQ(mdb_env_set_mapsize(mdb_env_, 1099511627776), MDB_SUCCESS); // 1TB
CHECK_EQ(mdb_env_open(mdb_env_,
@@ -325,21 +322,23 @@ void LMDBDataLayer::Setup(const LayerProto& proto,
ConvertDatumToSingleLableImageRecord(datum, record);
batchsize_=batchsize();
+ if(partition_dim() == 0)
+ batchsize_ /= npartitions;
records_.resize(batchsize_);
random_skip_=proto.lmdbdata_conf().random_skip();
}
/***************** Implementation for LRNLayer *************************/
-void LRNLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
+void LRNLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(),1);
lsize_ = proto.lrn_conf().local_size();
CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol";
knorm_=proto.lrn_conf().knorm();
alpha_ = proto.lrn_conf().alpha();
beta_ = proto.lrn_conf().beta();
- const vector<int>& s=srclayers[0]->data(this).shape();
+ const vector<int>& s=srclayers_[0]->data(this).shape();
data_.Reshape(s);
grad_.Reshape(s);
norm_.Reshape(s);
@@ -349,30 +348,22 @@ void LRNLayer::Setup(const LayerProto& proto,
width_=s[3];
}
-void LRNLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- Setup(proto, srclayers);
-}
-
-void LRNLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
+void LRNLayer::ComputeFeature(Phase phase, Metric* perf) {
const float salpha = alpha_ / lsize_;
- Shape<4> s=Shape4(batchsize_,channels_, height_, width_);
- Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s);
- Tensor<cpu, 4> data(data_.mutable_cpu_data(), s);
- Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s);
+ auto src = Tensor4(srclayers_[0]->mutable_data(this));
+ auto data = Tensor4(&data_);
+ auto norm = Tensor4(&norm_);
// stores normalizer without power
norm= chpool<red::sum>( F<op::square>(src) , lsize_ ) * salpha + knorm_;
data = src * F<op::power>(norm, -beta_ );
}
-void LRNLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+void LRNLayer::ComputeGradient(Phase phase) {
const float salpha = alpha_ / lsize_;
- Shape<4> s=Shape4(batchsize_,channels_, height_, width_);
- Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s);
- Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s);
- Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s);
- Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), s);
+ auto src = Tensor4(srclayers_[0]->mutable_data(this));
+ auto norm = Tensor4(&norm_);
+ auto grad = Tensor4(&grad_);
+ auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this));
gsrc = grad * F<op::power>( norm, -beta_ );
gsrc += ( - 2.0f * beta_ * salpha ) * chpool<red::sum>(
@@ -448,11 +439,11 @@ void MnistLayer::ParseRecords(Phase phase,
}
CHECK_EQ(dptr, blob->mutable_cpu_data()+blob->count());
}
-void MnistLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
- int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
- Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample();
+void MnistLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(),1);
+ int batchsize=static_cast<DataLayer*>(srclayers_[0])->batchsize();
+ Record sample=static_cast<DataLayer*>(srclayers_[0])->sample();
kernel_=proto.mnist_conf().kernel();
sigma_=proto.mnist_conf().sigma();
alpha_=proto.mnist_conf().alpha();
@@ -475,9 +466,9 @@ void MnistLayer::Setup(const LayerProto& proto,
}
/******************** Implementation for PoolingLayer******************/
-void PoolingLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
+void PoolingLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(),1);
PoolingProto pool_conf = proto.pooling_conf();
kernel_=pool_conf.kernel();
stride_=pool_conf.stride();
@@ -487,7 +478,7 @@ void PoolingLayer::Setup(const LayerProto& proto,
|| pool_ == PoolingProto_PoolMethod_MAX)
<< "Padding implemented only for average and max pooling.";
- const auto& srcshape=srclayers[0]->data(this).shape();
+ const auto& srcshape=srclayers_[0]->data(this).shape();
int dim=srcshape.size();
CHECK_GT(dim,2);
width_ = srcshape[dim-1];
@@ -503,68 +494,49 @@ void PoolingLayer::Setup(const LayerProto& proto,
grad_.ReshapeLike(data_);
}
-void PoolingLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- Setup(proto, srclayers);
-}
-
-void PoolingLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
- Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape4(batchsize_, channels_, height_, width_));
- Tensor<cpu, 4> data(data_.mutable_cpu_data(),
- Shape4(batchsize_, channels_, pooled_height_, pooled_width_));
+void PoolingLayer::ComputeFeature(Phase phase, Metric* perf) {
+ auto src = Tensor4(srclayers_[0]->mutable_data(this));
+ auto data = Tensor4(&data_);
if(pool_ == PoolingProto_PoolMethod_MAX)
data=pool<red::maximum>(src, kernel_, stride_);
else if(pool_ == PoolingProto_PoolMethod_AVE)
- data=pool<red::sum>(src, kernel_, stride_)
- *(1.0f/(kernel_*kernel_));
+ data=pool<red::sum>(src, kernel_, stride_) *(1.0f/(kernel_*kernel_));
}
/*
* partition only on num/channel dim
* assume grad and data have the same paritition
*/
-void PoolingLayer::ComputeGradient(const vector<SLayer>& srclayers) {
- Shape<4> s1= Shape4(batchsize_, channels_, height_, width_);
- Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),s1);
- Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),s1);
- Shape<4> s2= Shape4(batchsize_, channels_, pooled_height_, pooled_width_);
- Tensor<cpu, 4> data(data_.mutable_cpu_data(), s2);
- Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s2);
+void PoolingLayer::ComputeGradient(Phase phase) {
+ auto src = Tensor4(srclayers_[0]->mutable_data(this));
+ auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this));
+ auto data = Tensor4(&data_);
+ auto grad = Tensor4(&grad_);
if(pool_ == PoolingProto_PoolMethod_MAX)
- gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_);
+ gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_);
else if(pool_ == PoolingProto_PoolMethod_AVE)
- gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_)
- *(1.0f/(kernel_*kernel_));
+ gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_)
+ *(1.0f/(kernel_*kernel_));
}
/***************** Implementation for ReLULayer *****************************/
-void ReLULayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- data_.ReshapeLike(srclayers[0]->data(this));
- grad_.ReshapeLike(*(srclayers[0]->mutable_grad(this)));
-}
-
-void ReLULayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- Setup(proto, srclayers);
+void ReLULayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
+ data_.ReshapeLike(srclayers_[0]->data(this));
+ grad_.ReshapeLike(*(srclayers_[0]->mutable_grad(this)));
}
-void ReLULayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
- Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
- Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape1(data_.count()));
+void ReLULayer::ComputeFeature(Phase phase, Metric* perf) {
+ auto data = Tensor1(&data_);
+ auto src = Tensor1(srclayers_[0]->mutable_data(this));
data=F<op::relu>(src);
}
-void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) {
- Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count()));
- Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
- Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
- Shape1(data_.count()));
+void ReLULayer::ComputeGradient(Phase phase) {
+ auto data = Tensor1(&data_);
+ auto grad = Tensor1(&grad_);
+ auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
gsrc=F<op::relu_grad>(data)*grad;
}
@@ -573,7 +545,7 @@ void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) {
void RGBImageLayer::ParseRecords(Phase phase,
const vector<Record>& records, Blob<float>* blob){
const vector<int>& s=blob->shape();
- Tensor<cpu, 4> images(data_.mutable_cpu_data(), Shape4(s[0],s[1],s[2],s[3]));
+ auto images = Tensor4(&data_);
const SingleLabelImageRecord& r=records.at(0).image();
Tensor<cpu, 3> raw_image(Shape3(r.shape(0),r.shape(1),r.shape(2)));
AllocSpace(raw_image);
@@ -625,14 +597,14 @@ void RGBImageLayer::ParseRecords(Phase phase,
if(cropsize_)
FreeSpace(croped_image);
}
-void RGBImageLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),1);
+void RGBImageLayer::Setup(const LayerProto& proto, int npartitions) {
+ ParserLayer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(),1);
scale_=proto.rgbimage_conf().scale();
cropsize_=proto.rgbimage_conf().cropsize();
mirror_=proto.rgbimage_conf().mirror();
- int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
- Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample();
+ int batchsize=static_cast<DataLayer*>(srclayers_[0])->batchsize();
+ Record sample=static_cast<DataLayer*>(srclayers_[0])->sample();
vector<int> shape;
shape.push_back(batchsize);
for(int x: sample.image().shape()){
@@ -663,7 +635,7 @@ void RGBImageLayer::Setup(const LayerProto& proto,
}
/***************Implementation for ShardDataLayer**************************/
-void ShardDataLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
+void ShardDataLayer::ComputeFeature(Phase phase, Metric* perf){
if(random_skip_){
int nskip=rand()%random_skip_;
LOG(INFO)<<"Random Skip "<<nskip<<" records, there are "<<shard_->Count()
@@ -683,67 +655,55 @@ void ShardDataLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers
}
}
-void ShardDataLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
+void ShardDataLayer::Setup(const LayerProto& proto, int npartitions) {
+ Layer::Setup(proto, npartitions);
shard_= std::make_shared<DataShard>(proto.sharddata_conf().path(),
DataShard::kRead);
string key;
shard_->Next(&key, &sample_);
batchsize_=proto.sharddata_conf().batchsize();
+ if(partition_dim() == 0)
+ batchsize_ /= npartitions;
records_.resize(batchsize_);
random_skip_=proto.sharddata_conf().random_skip();
}
/*******************Implementation of TanLayer***************************/
-void TanhLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- data_.ReshapeLike(srclayers[0]->data(this));
- grad_.ReshapeLike(srclayers[0]->grad(this));
+void TanhLayer::Setup(const LayerProto& proto, int npartitions){
+ Layer::Setup(proto, npartitions);
+ data_.ReshapeLike(srclayers_[0]->data(this));
+ grad_.ReshapeLike(srclayers_[0]->grad(this));
}
-void TanhLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- Setup(proto, srclayers);
-}
-
-
-void TanhLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){
- Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
- Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
- Shape1(data_.count()));
+void TanhLayer::ComputeFeature(Phase phase, Metric* perf) {
+ auto data = Tensor1(&data_);
+ auto src = Tensor1(srclayers_[0]->mutable_data(this));
data=F<op::stanh>(src);
}
-void TanhLayer::ComputeGradient(const vector<SLayer>& srclayers) {
- Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
- Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count()));
- Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
- Shape1(data_.count()));
+void TanhLayer::ComputeGradient(Phase phase) {
+ auto data = Tensor1(&data_);
+ auto grad = Tensor1(&grad_);
+ auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
gsrc=F<op::stanh_grad>(data)*grad;
}
/********** * Implementation for SoftmaxLossLayer*************************/
-void SoftmaxLossLayer::Setup(const LayerProto& proto,
- const vector<SLayer>& srclayers){
- CHECK_EQ(srclayers.size(),2);
- data_.Reshape(srclayers[0]->data(this).shape());
+void SoftmaxLossLayer::Setup(const LayerProto& proto, int npartitions) {
+ LossLayer::Setup(proto, npartitions);
+ CHECK_EQ(srclayers_.size(),2);
+ data_.Reshape(srclayers_[0]->data(this).shape());
batchsize_=data_.shape()[0];
dim_=data_.count()/batchsize_;
topk_=proto.softmaxloss_conf().topk();
metric_.Reshape(vector<int>{2});
scale_=proto.softmaxloss_conf().scale();
}
-void SoftmaxLossLayer::SetupAfterPartition(const LayerProto& proto,
- const vector<int> &shape,
- const vector<SLayer>& srclayers){
- Setup(proto, srclayers);
-}
-void SoftmaxLossLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers) {
+void SoftmaxLossLayer::ComputeFeature(Phase phase, Metric* perf) {
Shape<2> s=Shape2(batchsize_, dim_);
Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s);
- Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s);
+ Tensor<cpu, 2> src(srclayers_[0]->mutable_data(this)->mutable_cpu_data(), s);
Softmax(prob, src);
- const float* label=srclayers[1]->data(this).cpu_data();
+ const float* label=srclayers_[1]->data(this).cpu_data();
const float* probptr=prob.dptr;
float loss=0, precision=0;
for(int n=0;n<batchsize_;n++){
@@ -769,14 +729,13 @@ void SoftmaxLossLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclaye
probptr+=dim_;
}
CHECK_EQ(probptr, prob.dptr+prob.shape.Size());
- float *metric=metric_.mutable_cpu_data();
- metric[0]=loss*scale_/(1.0f*batchsize_);
- metric[1]=precision*scale_/(1.0f*batchsize_);
+ perf->Add("loss", loss*scale_/(1.0f*batchsize_));
+ perf->Add("accuracy", precision*scale_/(1.0f*batchsize_));
}
-void SoftmaxLossLayer::ComputeGradient(const vector<SLayer>& srclayers) {
- const float* label=srclayers[1]->data(this).cpu_data();
- Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this);
+void SoftmaxLossLayer::ComputeGradient(Phase phase) {
+ const float* label=srclayers_[1]->data(this).cpu_data();
+ Blob<float>* gsrcblob=srclayers_[0]->mutable_grad(this);
gsrcblob->CopyFrom(data_);
float* gsrcptr=gsrcblob->mutable_cpu_data();
for(int n=0;n<batchsize_;n++){
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 2240499..6d82734 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -1,22 +1,19 @@
#include <algorithm>
#include <queue>
-#include "proto/model.pb.h"
#include "neuralnet/neuralnet.h"
#include "utils/singleton.h"
-#include "utils/factory.h"
-#include "utils/graph.h"
-#include "utils/cluster.h"
namespace singa {
#define LayerT(x) LayerProto_LayerType_k##x
#define RegisterLayer(factory, id) \
- factory->Register(LayerProto_LayerType_k##id,\
+ factory->Register(LayerProto_LayerType_k##id, \
CreateInstance(id##Layer, Layer))
-void NeuralNet::RegisterLayers(){
- Factory<Layer>* factory=Singleton<Factory<Layer>>::Instance();
+void NeuralNet::RegisterLayers() {
+ Factory<Layer>* factory = Singleton<Factory<Layer>>::Instance();
+ // FooLayer's type is kFoo, register using Foo
RegisterLayer(factory, BridgeDst);
RegisterLayer(factory, BridgeSrc);
RegisterLayer(factory, Convolution);
@@ -37,402 +34,329 @@ void NeuralNet::RegisterLayers(){
RegisterLayer(factory, Split);
RegisterLayer(factory, Tanh);
}
-shared_ptr<NeuralNet> NeuralNet::SetupNeuralNet(const NetProto& np, Phase phase,
- int group_size){
+
+shared_ptr<NeuralNet> NeuralNet::Create(
+ const NetProto& conf,
+ Phase phase,
+ int npartitions) {
NetProto proto;
- proto.set_partition_type(np.partition_type());
- // exclude layers if necessary
- for(auto& layer:np.layer()){
- bool include=true;
- for(int x: layer.exclude()){
- if(x==phase)
- include=false;
+ proto.CopyFrom(conf);
+ proto.clear_layer();
+ // exclude layers according to phase
+ for (const auto& layer : conf.layer()) {
+ bool include = true;
+ for (auto x : layer.exclude()) {
+ if (x == phase)
+ include = false;
}
- if(include){
- LayerProto* lp=proto.add_layer();
+ if (include) {
+ LayerProto* lp = proto.add_layer();
lp->CopyFrom(layer);
+ // using net partition if layer partition is not set
+ if (!lp->has_partition_dim())
+ lp->set_partition_dim(proto.partition_dim());
}
}
- LOG(INFO)<<"NeuralNet config is "<<proto.DebugString();
- return make_shared<NeuralNet>(proto, group_size);
-}
-NeuralNet::NeuralNet(NetProto net_proto, int group_size) {
- group_size_=group_size;
- for(int i=0;i<net_proto.layer_size();i++){
- LayerProto * layer_proto=net_proto.mutable_layer(i);
- if(!layer_proto->has_partition_type())
- layer_proto->set_partition_type(net_proto.partition_type());
- }
+ LOG(INFO) << "NeuralNet config is\n" << proto.DebugString();
- LOG(INFO)<<"Construct Neural Net...";
- ConstructNeuralNet(net_proto);
- {
- string vis_folder=Cluster::Get()->vis_folder();
- std::ofstream fout(vis_folder+"/nopartition.json", std::ofstream::out);
- fout<<ToString();
- fout.flush();
- fout.close();
- }
- if(group_size_>1){
- PartitionNeuralNet();
- string vis_folder=Cluster::Get()->vis_folder();
- std::ofstream fout(vis_folder+"/partition.json", std::ofstream::out);
- fout<<ToString();
- fout.flush();
- fout.close();
- }
- for(auto layer: layers_){
- DLOG(INFO)<<layer->name();
- }
- for(auto& layer: layers_){
- for(shared_ptr<Param> p: layer->GetParams()){
- params_.push_back(p);
- }
- }
- LOG(INFO)<<"Neural Net constructed";
- // init all data members to avoid conflicts from multi-thread access
- losslayers();
- paramid2param(0);
- datalayers();
- parserlayers();
+ // TODO(wangwei) create net based on net type, e.g., directed, undirected, etc
+ auto net = std::make_shared<NeuralNet>(proto, npartitions);
+ return net;
}
-void NeuralNet::ConstructNeuralNet(const NetProto& net_proto){
- // construct graph, one node for one layer, identified by layer name
- map<string, LayerProto> protos;
- for (auto &layer_proto : net_proto.layer()){
- graph_.AddNode(layer_proto.name());
- protos[layer_proto.name()]=layer_proto;
- }
- for (auto &layer_proto : net_proto.layer())
- if(layer_proto.srclayers_size())
- for(const string& src: layer_proto.srclayers())
- graph_.AddEdge(src, layer_proto.name());
+NeuralNet::~NeuralNet() {
+ for (auto layer : layers_)
+ delete layer;
+}
- // topology sort
- graph_.Sort();
- //LOG(ERROR)<<"pure graph without partition\n"<< graph_.ToString();
+NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
+ LOG(INFO) << "Constructing Neural Net...";
+ auto graph = CreateGraph(netproto, npartitions);
+ CreateNetFromGraph(graph, npartitions);
+ PrepareDataStructures();
+ for (Node* node : graph->nodes())
+ delete static_cast<LayerProto*>(node->proto);
+ delete graph;
+ LOG(INFO) << "Neural net constructed";
+}
- auto* factory=Singleton<Factory<Layer>>::Instance();
- // create Layers according to topology order
- for(SNode node: graph_.nodes()){
- shared_ptr<Layer> layer(factory->Create(protos[node->name()].type()));
- layer->Init(protos[node->name()]);
- name2layer_[node->name()]=layer;
+void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) {
+ auto* factory = Singleton<Factory<Layer>>::Instance();
+ // create one layer per node
+ for (Node* node : graph->nodes()) {
+ auto layer = factory->Create(static_cast<LayerProto*>(node->proto)->type());
layers_.push_back(layer);
+ name2layer_[node->name] = layer;
}
-
- // connect Layers.
- for(SNode node: graph_.nodes()){
- auto layer=name2layer_[node->name()];
- for(SNode dst: node->dstnodes())
- layer->AddDstLayer(name2layer_[dst->name()]);
- for(SNode src: node->srcnodes())
- layer->AddSrcLayer(name2layer_[src->name()]);
+ // connect layers
+ for (Node* node : graph->nodes()) {
+ auto layer = name2layer_[node->name];
+ layer->clear_dstlayers();
+ for (Node* dst : node->dstnodes)
+ layer->add_dstlayer(name2layer_[dst->name]);
+ layer->clear_srclayers();
+ for (Node* src : node->srcnodes)
+ layer->add_srclayer(name2layer_[src->name]);
}
- // setup layer properties, e.g., shapes
- int paramid=0;
- for(auto& layer: layers_){
- layer->Setup();
- for(auto param: layer->GetParams())
- param->set_id(paramid++);
+ // setup layers
+ int paramid = 0;
+ map<string, string> layerinfo;
+ map<string, vector<Layer*>> share_param_layers;
+ for (Node* node : graph->nodes()) {
+ auto layer = name2layer_[node->name];
+ layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions);
+ layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape());
+ for (auto param : layer->GetParams())
+ param->set_id(paramid++);
+ if (layer->partition_dim() == 0)
+ share_param_layers[node->origin].push_back(layer);
}
- LOG(INFO)<<"network graph witout partition\n"<<ToString();
-}
-
-void NeuralNet::PartitionNeuralNet(){
- graph_=CreatePartitonedGraph(layers_, name2layer_);
- //DLOG(ERROR)<<"pure graph after partition\n"<<graph_.ToString();
- map<string, shared_ptr<Layer>> name2layer(name2layer_);
- map<string, vector<shared_ptr<Layer>>> share_conf_layers;
- name2layer_.clear();
- layers_.clear();
- int gsize=group_size_;
- auto* factory=Singleton<Factory<Layer>>::Instance();
- // create Layers according to topology order
- for(SNode node: graph_.nodes()){
- LayerProto proto;
- proto.set_name(node->name());
- proto.set_partitionid(node->val().partitionid);
- string origin=node->val().origin;
- if (origin=="kSlice"){
- proto.set_type(LayerT(Slice));
- SliceProto *slice=proto.mutable_slice_conf();
- slice->set_slice_dimension(node->val().slice_dimension);
- slice->set_slice_num(node->dstnodes().size());
- }else if(origin== "kConcate"){
- proto.set_type(LayerT(Concate));
- ConcateProto *concate=proto.mutable_concate_conf();
- concate->set_concate_dimension(node->val().concate_dimension);
- concate->set_concate_num(node->srcnodes().size());
- }else if(origin=="kSplit"){
- proto.set_type(LayerT(Split));
- SplitProto *split=proto.mutable_split_conf();
- split->set_num_splits(node->dstnodes().size());
- }else if(origin=="kBridgeSrc"){
- proto.set_type(LayerT(BridgeSrc));
- }else if(origin =="kBridgeDst"){
- proto.set_type(LayerT(BridgeDst));
- }else{
- CHECK(name2layer.find(node->val().origin)!=name2layer_.end())
- <<"Unkown origin for node "<<node->val().origin;
+ LOG(INFO) << "Neural net structure\n" << graph->ToJson(layerinfo);
+ // share Params for layers generated from the same origin layer
+ for (auto & entry : share_param_layers) {
+ auto owner = entry.second.begin();
+ auto owner_params = (*owner)->GetParams();
+ for (auto it = owner + 1; it != entry.second.end(); it++) {
+ auto params = (*it)->GetParams();
+ CHECK_EQ(params.size(), owner_params.size());
+ for (size_t i = 0; i < params.size(); i++)
+ params.at(i)->ShareData(owner_params.at(i));
}
- shared_ptr<Layer> newlayer;
- if(proto.has_type()){
- // layers added due to partition
- shared_ptr<Layer> layer(factory->Create(proto.type()));
- layer->Init(proto);
- newlayer=layer;
- }else{
- // partitioned layers from origin neuralnet
- auto oldlayer=name2layer.at(node->val().origin);
- vector<int> shape=oldlayer->shape(nullptr);
- if(oldlayer->partition_type()==kNone){
- newlayer=oldlayer;
- } else{
- int pdim=oldlayer->partition_dimension();
- shape[pdim]=shape[pdim]/gsize+
- ((node->val().partitionid==gsize-1)?shape[pdim]%gsize:0);
- shared_ptr<Layer> layer(factory->Create(oldlayer->type()));
- layer->Init(*oldlayer, shape);
- layer->set_name(node->name());
- newlayer=layer;
- if(oldlayer->partition_type()==kDataPartition)
- share_conf_layers[node->val().origin].push_back(newlayer);
- }
- newlayer->set_partitionid(node->val().partitionid);
- }
- layers_.push_back(newlayer);
- name2layer_[node->name()]=newlayer;
}
+}
- // connect Layers.
- for(SNode node: graph_.nodes()){
- auto layer=name2layer_[node->name()];
- layer->ClearDstLayers();
- for(SNode dst: node->dstnodes())
- layer->AddDstLayer(name2layer_[dst->name()]);
- layer->ClearSrcLayers();
- for(SNode src: node->srcnodes())
- layer->AddSrcLayer(name2layer_[src->name()]);
- }
+// add a node for SliceLayer between srcnode and dstnodes
+Node* SliceNode(Graph* graph, Node* srcnode,
+ const vector<Node*>& dstnodes, bool connect_dst) {
+ string name = srcnode->name + "<";
+ LayerProto *proto = new LayerProto();
+ proto->set_name(name);
+ proto->set_type(LayerProto_LayerType_kSlice);
+ proto->set_partition_id(
+ static_cast<LayerProto*>(srcnode->proto)->partition_id());
+ auto conf = proto->mutable_slice_conf();
+ conf->set_slice_dim(
+ static_cast<LayerProto*>(dstnodes[0]->proto)->partition_dim());
+ Node* node = new Node(name, "##" + name, proto->partition_id(), proto);
+ graph->AddNode(node);
+ graph->AddEdge(srcnode, node);
+ if (connect_dst)
+ for (Node* dst : dstnodes)
+ graph->AddEdge(node, dst);
+ return node;
+}
- LOG(INFO)<<"Adjacency matrix\n"<<ToAdjacency();
+// add a node for ConcateLayer between srcnodes and dstnode
+Node* ConcateNodes(Graph* graph, const vector<Node*>& srcnodes, Node* dstnode) {
+ string name = ">" + dstnode->name;
+ LayerProto *proto = new LayerProto();
+ proto->set_name(name);
+ proto->set_type(LayerProto_LayerType_kConcate);
+ proto->set_partition_id(
+ static_cast<LayerProto*>(dstnode->proto)->partition_id());
+ auto conf = proto->mutable_concate_conf();
+ conf->set_concate_dim(
+ static_cast<LayerProto*>(srcnodes[0]->proto)->partition_dim());
+ Node* node = new Node(name, "##" + name, proto->partition_id(), proto);
+ graph->AddNode(node);
+ graph->AddEdge(node, dstnode);
+ for (Node* src : srcnodes)
+ graph->AddEdge(src, node);
+ return node;
+}
- // set up layers after
- int paramid=0;
- for(shared_ptr<Layer> layer: layers_){
- const vector<int>& shape=layer->shape(nullptr);
- layer->SetupAfterPartition();
- for(auto param: layer->GetParams())
- param->set_id(paramid++);
- const vector<int>& newshape=layer->shape(nullptr);
- if(shape.size())
- CHECK(std::equal(shape.begin(),shape.end(),newshape.begin()));
- }
+// add a node for SplitLayer between srcnode and dstnodes
+Node* SplitNode(Graph* graph, Node* srcnode, const vector<Node*>& dstnodes) {
+ string name = srcnode->name + "+";
+ LayerProto *proto = new LayerProto();
+ proto->set_name(name);
+ proto->set_type(LayerProto_LayerType_kSplit);
+ proto->set_partition_id(
+ static_cast<LayerProto*>(srcnode->proto)->partition_id());
+ Node* node = new Node(name, "##" + name, proto->partition_id(), proto);
+ graph->AddNode(node);
+ graph->AddEdge(srcnode, node);
+ for (Node* dst : dstnodes)
+ graph->AddEdge(node, dst);
+ return node;
+}
- // share Params for layers generated from the same origin layer due to
- // data partition
- for(auto & entry: share_conf_layers){
- auto layers= entry.second;
- auto owner=layers.begin();
- auto owner_confs=(*owner)->GetParams();
- for(auto it=owner+1; it!=layers.end();it++){
- auto params=(*it)->GetParams();
- CHECK_EQ(params.size(), owner_confs.size());
- for(size_t i=0;i<params.size();i++)
- params.at(i)->ShareData(owner_confs.at(i));
- }
- }
- LOG(INFO)<<"network graph after partition layers\n"<<ToString();
+// add a pair of nodes for BridgeSrcLayer and BridgeDstLayer between srcnode
+// and dstnode
+void BridgeNodes(Graph* graph, Node* srcnode, Node* dstnode) {
+ string sname = srcnode->name + ":-";
+ LayerProto *sproto = new LayerProto();
+ sproto->set_name(sname);
+ sproto->set_type(LayerProto_LayerType_kBridgeSrc);
+ sproto->set_partition_id(
+ static_cast<LayerProto*>(srcnode->proto)->partition_id());
+ auto sbridge = new Node(sname, "##" + sname, sproto->partition_id(), sproto);
+ string dname = "-:" + dstnode->name;
+ LayerProto *dproto = new LayerProto();
+ dproto->set_name(dname);
+ dproto->set_type(LayerProto_LayerType_kBridgeDst);
+ dproto->set_partition_id(
+ static_cast<LayerProto*>(dstnode->proto)->partition_id());
+ auto dbridge = new Node(dname, "##" + dname, dproto->partition_id(), dproto);
+ graph->AddNode(sbridge);
+ graph->AddNode(dbridge);
+ graph->AddEdge(srcnode, sbridge);
+ graph->AddEdge(sbridge, dbridge);
+ graph->AddEdge(dbridge, dstnode);
}
-Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers,
- const map<string, shared_ptr<Layer>>& name2layer){
- Graph graph;
- // partition origin nodes/layers
- map<string, vector<SNode>> layer2nodes; //from name of original layer to nodes
- int gsize=group_size_;
- for(const auto& layer: layers){
- vector<SNode> nodes;
- if(layer->partition_type()==kDataPartition||
- layer->partition_type()==kLayerPartition){
+Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
+ Graph *graph = new Graph();
+ // from name of original layer to nodes
+ map<string, vector<Node*>> name2nodes;
+ map<string, const LayerProto*> name2proto;
+ for (const auto& layer : netproto.layer()) {
+ vector<Node*> nodes;
+ int pdim = layer.partition_dim();
+ if (pdim == 0 || pdim == 1) {
char suffix[4];
- for(int i=0;i<gsize;i++){
- sprintf(suffix, "%02d", i);
+ for (int i = 0; i < npartitions; i++) {
+ LayerProto *proto = new LayerProto(layer);
+ snprintf(suffix, sizeof(suffix), "%02d", i);
// differentiate partitions
- string nodename=layer->name()+"@"+string(suffix);
- auto node=graph.AddNode(nodename, LayerInfo{layer->name(), i,-1,-1});
+ string nodename = layer.name() + "@" + string(suffix);
+ proto->set_partition_id(i);
+ auto node = new Node(nodename, layer.name(), i, proto);
+ graph->AddNode(node);
nodes.push_back(node);
}
- }else if(layer->partition_type()==kNone){
- auto node=graph.AddNode(layer->name(),
- LayerInfo{layer->name(), 0,-1,-1});
+ } else if (pdim == -1) {
+ LayerProto *proto = new LayerProto(layer);
+ auto node = new Node(layer.name(), layer.name(), 0, proto);
+ graph->AddNode(node);
nodes.push_back(node);
- }else{
- LOG(FATAL)<<"Unknown partition type "<<layer->partition_type();
+ } else {
+ LOG(FATAL) << "Cannot partition layer (" << layer.name() <<") on dim: "
+ << layer.partition_dim();
}
- layer2nodes[layer->name()]=nodes;
+ name2nodes[layer.name()] = nodes;
+ name2proto[layer.name()] = &layer;
}
- // connect nodes, nodes for ConcateLayer and SliceLayer are added.
- for(shared_ptr<Layer> layer: layers){
- string name=layer->name();
- PartitionType type=layer->partition_type();
- const vector<SNode>& nodes=layer2nodes.at(name);
- for(int srcid=0;srcid<layer->srclayers_size();srcid++){
- shared_ptr<Layer> srclayer=layer->srclayers()[srcid];
- string srcname=srclayer->name();
- const vector<SNode> srcnodes=layer2nodes.at(srcname);
- PartitionType srctype=srclayer->partition_type();
- ConnectionType connection=layer->connection_type(srcid);
- if(srctype==kNone){
- CHECK_EQ(srcnodes.size(),1)
- <<"local layer "<<srcname<<" should not be partitioned";
- SNode srcnode=srcnodes[0];
- if(type==kDataPartition||(type==kLayerPartition&&connection==kOneToOne)){
- LayerInfo info=srcnode->val();
- info.slice_dimension=name2layer.at(name)->partition_dimension();
- graph.InsertSliceNode(srcnode, nodes, info);
- } else if(type==kNone){
- CHECK_EQ(nodes.size(),1)
- <<"local layer "<<name<<" should not be nodeed";
- graph.AddEdge(srcnode, nodes[0]);
- } else { // type==kLayerPartition&&connection==kOneToAll
- graph.InsertSplitNode(srcnode, nodes);
- }
- }else if((type==kNone
- &&(srctype==kDataPartition||srctype==kLayerPartition))
- ||(type==kLayerPartition&&connection==kOneToAll&&
- (srctype==kDataPartition||srctype==kLayerPartition))){
+ // connect nodes, nodes for ConcateLayer, SliceLayer and SplitLayer are added.
+ auto* factory = Singleton<Factory<Layer>>::Instance();
+ for (const auto& layerproto : netproto.layer()) {
+ string name = layerproto.name();
+ int pdim = layerproto.partition_dim();
+ const vector<Node*>& nodes = name2nodes.at(name);
+ for (auto srcname : layerproto.srclayers()) {
+ const vector<Node*>& srcnodes = name2nodes.at(srcname);
+ // TODO(wangwei): consider the type of each connection
+ auto *layer = factory->Create(layerproto.type());
+ ConnectionType connection = layer->src_neuron_connection(0);
+ delete layer;
+ int src_pdim = name2proto[srcname]->partition_dim();
+ // no partition of src layer
+ if (src_pdim == -1) {
+ Node* srcnode = srcnodes[0];
+ if (pdim == 0 || (pdim == 1 && connection == kOneToOne))
+ SliceNode(graph, srcnode, nodes, true);
+ else if (pdim == -1)
+ graph->AddEdge(srcnode, nodes[0]);
+ else // type==kLayerPartition&&connection==kOneToAll
+ SplitNode(graph, srcnode, nodes);
+ } else if ((pdim == -1 && (src_pdim == 0 || src_pdim == 1))
+ ||(pdim == 1 && connection == kOneToAll && src_pdim == 0)) {
// copy/concate the whole srclayer for every dst partition
- for(SNode node:nodes){
- LayerInfo info=node->val();
- info.concate_dimension=name2layer.at(srcname)->partition_dimension();
- CHECK_GE(info.concate_dimension,0);
- graph.InsertConcateNode(srcnodes, node, info);
- }
- }else if((srctype==kLayerPartition&&type==kDataPartition)
- || (srctype==kDataPartition&&type==kLayerPartition)){
+ for (Node* node : nodes)
+ ConcateNodes(graph, srcnodes, node);
+ } else if ((src_pdim == 1 && pdim == 0) || (src_pdim == 0 && pdim == 1)) {
// the most complext scenario
- vector<SNode> slicenodes;
- for(SNode srcnode: srcnodes){
- LayerInfo info=srcnode->val();
- info.slice_dimension=name2layer.at(name)->partition_dimension();
- slicenodes.push_back(graph.InsertSliceNode(srcnode, nodes,
- info, false));
- }
- for(SNode node: nodes){
- LayerInfo info=node->val();
- info.concate_dimension=name2layer.at(srcname)->partition_dimension();
- CHECK_GE(info.concate_dimension,0);
- graph.InsertConcateNode(slicenodes, node, info);
- }
- }else if((srctype==kDataPartition&&type==kDataPartition)||
- (srctype==kLayerPartition&&type==kLayerPartition&&
- layer->connection_type(srcid)==kOneToOne)){
+ vector<Node*> nodes;
+ for (Node* srcnode : srcnodes)
+ nodes.push_back(SliceNode(graph, srcnode, nodes, false));
+ for (Node* node : nodes)
+ ConcateNodes(graph, nodes, node);
+ } else if ((src_pdim == 0 && pdim == 0)||
+ (src_pdim == 1 && pdim == 1 && connection == kOneToOne)) {
CHECK_EQ(srcnodes.size(), nodes.size());
- for(size_t i=0;i<srcnodes.size();i++){
- graph.AddEdge(srcnodes[i], nodes[i]);
- }
+ for (size_t i = 0; i < srcnodes.size(); i++)
+ graph->AddEdge(srcnodes[i], nodes[i]);
}
}
}
// must do topology sort, because we have added new nodes.
- graph.Sort();
- //LOG(ERROR)<<graph.ToString();
+ graph->Sort();
- // add node for split layer
- bool data_node=true;
- vector<SNode> oldnodes=graph.nodes();
- for(SNode node: oldnodes){
- if(node->dstnodes_size()>1&&node->val().origin!="kSlice"
- &&node->val().origin!="kSplit"&&!data_node){
- vector<SNode> dstnodes=node->dstnodes();
- for(SNode dst: dstnodes)
- graph.RemoveEdge(node, dst);
- graph.InsertSplitNode(node, dstnodes);
+ // add nodes for SplitLayer
+ vector<Node*> oldnodes = graph->nodes();
+ for (Node* node : oldnodes) {
+ auto layer = factory->Create(static_cast<LayerProto*>(node->proto)->type());
+ if (node->dstnodes.size() > 1
+ && layer->dst_layer_connection() == kOneToOne) {
+ vector<Node*> dstnodes = node->dstnodes;
+ for (Node* dst : dstnodes)
+ graph->RemoveEdge(node, dst);
+ SplitNode(graph, node, dstnodes);
}
- data_node=false;
+ delete layer;
}
- // add bridge
- oldnodes=graph.nodes();
- for(SNode node: oldnodes){
- vector<SNode> dstnodes=node->dstnodes();
- for(size_t i=0;i<dstnodes.size();i++){
- SNode dstnode=dstnodes.at(i);
- if(node->val().partitionid!=dstnode->val().partitionid){
- graph.RemoveEdge(node, dstnode);
- graph.InsertBridgeNode(node, dstnode);
+ // add nodes for bridge layers
+ for (Node* node : oldnodes) {
+ vector<Node*> dstnodes = node->dstnodes;
+ auto pid1 = static_cast<LayerProto*>(node->proto)->partition_id();
+ for (size_t i = 0; i < dstnodes.size(); i++) {
+ Node* dstnode = dstnodes.at(i);
+ auto pid2 = static_cast<LayerProto*>(node->proto)->partition_id();
+ if (pid1 != pid2) {
+ graph->RemoveEdge(node, dstnode);
+ BridgeNodes(graph, node, dstnode);
}
}
}
- graph.Sort();
+ graph->Sort();
+ DLOG(INFO) << "Pure graph structure\n" << graph->ToJson();
return graph;
}
-std::string NeuralNet::ToString(){
- map<string, string> info;
- for(auto layer: layers_){
- info[layer->name()]=IntVecToString(layer->shape(nullptr));
- }
- return graph_.ToString(info);
-}
-
-std::string NeuralNet::ToAdjacency(){
- string disp="";
- for(auto& layer: layers_){
- disp+=layer->name()+": ";
- for(const auto& dst: layer->dstlayers())
- disp+=dst->name()+", ";
- disp+="\n";
- }
- return disp;
-}
-
-void NeuralNet::ToProto(NetProto *proto, bool copyData) {
- proto->clear_layer();
-}
+void NeuralNet::PrepareDataStructures() {
+ parserlayers_.clear();
+ losslayers_.clear();
+ datalayers_.clear();
+ params_.clear();
+ paramid2param_.clear();
+ name2layer_.clear();
-string NeuralNet::DebugInfo(){
- string ret;
- char display[4096];
- for(auto& layer: layers_){
- if(!layer->is_datalayer()){
- sprintf(display, "Forward layer %10s data norm1 %13.9f\n",
- layer->name().c_str(), layer->data(nullptr).asum_data());
- ret+=string(display);
- }
- }
- for (auto it = layers_.rbegin(); it != layers_.rend(); it++){
- shared_ptr<Layer> layer=*it;
- if(!(layer->is_datalayer()||layer->is_losslayer()||layer->is_parserlayer())){
- sprintf(display, "Backward layer %10s grad norm1 %13.9f\n",
- layer->name().c_str(), layer->grad(nullptr).asum_data());
- ret+=string(display);
+ for (auto& layer : layers_) {
+ name2layer_[layer->name()] = layer;
+ if (layer->is_parserlayer())
+ parserlayers_.push_back(static_cast<ParserLayer*>(layer));
+ if (layer->is_losslayer())
+ losslayers_.push_back(static_cast<LossLayer*>(layer));
+ if (layer->is_datalayer())
+ datalayers_.push_back(static_cast<DataLayer*>(layer));
+ for (Param* p : layer->GetParams()) {
+ paramid2param_[p->id()] = p;
+ params_.push_back(p);
}
}
- for(auto& layer: layers_){
- for(auto param: layer->GetParams()){
- sprintf(display, "Layer %10s, param id %2d, name %10s,\
- value norm1 %13.9f, grad norm1 %13.9f\n",
- layer->name().c_str(), param->id(), param->name().c_str(),
- param->data().asum_data(), param->grad().asum_data());
- ret+=string(display);
- }
+}
+std::string NeuralNet::ToAdjacency() {
+ string disp = "";
+ for (auto& layer : layers_) {
+ disp += layer->name()+": ";
+ for (const auto& dst : layer->dstlayers())
+ disp += dst->name()+", ";
+ disp += "\n";
}
- return ret;
+ return disp;
}
-void NeuralNet::ShareParams(shared_ptr<NeuralNet> other, int flag){
- for(auto& layer: layers_){
- auto otherlayer=other->name2layer(layer->name());
- if(otherlayer!=nullptr){
- const auto& otherparams=otherlayer->GetParams();
- const auto& params=layer->GetParams();
+
+void NeuralNet::ShareParams(shared_ptr<NeuralNet> other) {
+ for (auto& layer : layers_) {
+ auto otherlayer = other->name2layer(layer->name());
+ if (otherlayer != nullptr) {
+ const auto& otherparams = otherlayer->GetParams();
+ const auto& params = layer->GetParams();
CHECK_EQ(params.size(), otherparams.size());
- for(size_t i=0;i<params.size();i++){
+ for (size_t i = 0; i < params.size(); i++) {
params[i]->ShareData(otherparams[i]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/proto/common.proto
----------------------------------------------------------------------
diff --git a/src/proto/common.proto b/src/proto/common.proto
index 70b743c..256206c 100644
--- a/src/proto/common.proto
+++ b/src/proto/common.proto
@@ -38,6 +38,7 @@ message BlobProtos {
enum ConnectionType {
kOneToOne = 0;
kOneToAll = 1;
+ kOneToMany = 2;
}
// to import caffe's lmdb dataset
@@ -79,3 +80,9 @@ message SingleLabelImageRecord {
optional bytes pixel = 3;
repeated float data = 4;
}
+
+message MetricProto {
+ repeated string name =1;
+ repeated int32 count = 2;
+ repeated float val = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/proto/model.proto
----------------------------------------------------------------------
diff --git a/src/proto/model.proto b/src/proto/model.proto
index 4256491..a8de5d5 100644
--- a/src/proto/model.proto
+++ b/src/proto/model.proto
@@ -7,6 +7,8 @@ enum Phase {
kPositive = 3;
// negative phase for contrastive divergence algorithm
kNegative = 4;
+ kForward = 5;
+ kBackward = 6;
}
message ModelProto {
@@ -58,7 +60,7 @@ message ModelProto {
message NetProto {
repeated LayerProto layer = 1;
// partitioning type for parallelism
- optional PartitionType partition_type = 3 [default = kNone];
+ optional int32 partition_dim = 2 [default = -1];
}
// weight matrix should be defined before bias vector
@@ -99,7 +101,7 @@ message ParamProto {
// multiplied on the global weight decay.
optional float weight_decay_multiplier = 16 [default = 1];
// partition dimension, -1 for no partition
- optional int32 partition_dim = 30 [default = -1];
+ optional int32 partition_dim = 30;
// usually, the program will infer the param shape
repeated int32 shape = 31;
@@ -185,15 +187,15 @@ message LayerProto {
optional SplitProto split_conf = 42;
// configuration for tanh layer
optional TanhProto tanh_conf = 43;
- // partition type which overrides the partition type for neural net
- optional PartitionType partition_type = 59;
+
+
+ // overrides the partition dimension for neural net
+ optional int32 partition_dim =59 [default = -1];
optional string datablob = 58 [default = "unknow"];
// names of parameters shared from other layers
repeated string share_param = 60;
- // TODO(wangwei): make location ID an array
- optional int32 locationid = 61 [default = 0];
- optional int32 partitionid = 62 [default = 0];
+ optional int32 partition_id = 62 [default = 0];
}
message RGBImageProto {
@@ -246,9 +248,7 @@ message ConvolutionProto {
message ConcateProto {
// on which dimension, starts from 0
- required int32 concate_dimension = 1;
- // concatenate offset
- optional int32 concate_num = 30;
+ required int32 concate_dim = 1;
}
message DataProto {
@@ -328,8 +328,7 @@ message PoolingProto {
}
message SliceProto{
- required int32 slice_dimension=1;
- required int32 slice_num=2;
+ required int32 slice_dim = 1;
}
message ReLUProto {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 42d6a79..cbb0ee1 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -36,7 +36,7 @@ void Server::Run(){
ping->add_frame("PING", 4);
ping->set_type(kConnect);
dealer_->Send(&ping);
- vector<shared_ptr<Param>> master_params;
+ vector<Param*> master_params;
size_t syncEntry=0;
//start recv loop and process requests
while (true){
@@ -121,13 +121,13 @@ void Server::Run(){
Msg* Server::HandlePut(Msg **msg){
int version=(*msg)->trgt_third();
int pid=(*msg)->trgt_second();
- shared_ptr<Param> param=nullptr;
+ Param* param=nullptr;
if(shard_->find(pid)!=shard_->end()){
LOG(ERROR)<<"Param ("<<pid<<") is put more than once";
param=shard_->at(pid);
}else{
auto factory=Singleton<Factory<Param>>::Instance();
- param=shared_ptr<Param>(factory ->Create("Param"));
+ param=factory ->Create("Param");
(*shard_)[pid]=param;
}
auto response=param->HandlePutMsg(msg);
@@ -147,7 +147,7 @@ Msg* Server::HandlePut(Msg **msg){
return response;
}
-Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){
+Msg* Server::HandleGet(Param* param, Msg **msg){
if(param->version()<(*msg)->trgt_third())
return *msg;
else{
@@ -158,7 +158,7 @@ Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){
}
}
-Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) {
+Msg* Server::HandleUpdate(Param* param, Msg **msg) {
auto* tmp=static_cast<Msg*>((*msg)->CopyAddr());
tmp->SwapAddr();
int paramid=(*msg)->trgt_first();
@@ -174,7 +174,7 @@ Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) {
return response;
}
-Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){
+Msg* Server::HandleSyncRequest(Param* param, Msg **msg){
Msg* response=nullptr;
auto shape=Shape1(param->size());
CHECK_EQ((*msg)->frame_size(), param->size()*sizeof(float));
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index ce135cc..f4e52a6 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -7,14 +7,15 @@
#include "proto/common.pb.h"
#include "trainer/trainer.h"
#include "mshadow/tensor.h"
+
+namespace singa {
using std::vector;
using std::map;
using namespace std::chrono;
+using std::make_shared;
typedef std::chrono::milliseconds TimeT;
-namespace singa {
-
void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){
// register all layers appearing in the neural net
singa::NeuralNet::RegisterLayers();
@@ -33,8 +34,8 @@ void HandleWorkerFinish(void * ctx){
hctx->dealer->Send(&msg);
}
-const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num,
- const vector<shared_ptr<Param>>& params){
+const std::unordered_map<int, vector<std::pair<int, int>>>
+SliceParams(int num, const vector<Param*>& params){
std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
if (num==0)
return paramid2slices;
@@ -114,15 +115,15 @@ const vector<int> PartitionSlice(int num, const vector<int>& slices){
previd=slice2box[i];
} else
disp+=" "+std::to_string(slices[i]);
- LOG(INFO)<<"partition slice (av ="<<avg<<", num="<<num<<"):"<<disp;
+ LOG(INFO)<<"partition slice (avg ="<<avg<<", num="<<num<<"):"<<disp;
return slice2box;
}
-vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
+vector<Server*> Trainer::CreateServers(int nthreads,
const ModelProto & mproto,
const vector<int> slices,
vector<HandleContext*>* ctx){
auto cluster=Cluster::Get();
- vector<shared_ptr<Server>> servers;
+ vector<Server*> servers;
if(!cluster->has_server())
return servers;
@@ -139,7 +140,7 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
auto dealer=make_shared<Dealer>();
dealer->Connect(kInprocRouterEndpoint);
for(int sid=start;sid<end;sid++){
- auto server=make_shared<Server>(nthreads++, gid, sid);
+ auto server=new Server(nthreads++, gid, sid);
server->Setup(mproto.updater(), server_shard_, slice2group);
servers.push_back(server);
auto *hc=new HandleContext{dealer, gid, sid};
@@ -151,20 +152,20 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
return servers;
}
-vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
+vector<Worker*> Trainer::CreateWorkers(int nthreads,
const ModelProto& mproto, vector<int> *slice_size){
auto cluster=Cluster::Get();
- auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
+ auto net=NeuralNet::Create(mproto.neuralnet(), kTrain,
cluster->nworkers_per_group());
int lcm=LeastCommonMultiple(cluster->nserver_groups(), cluster->nservers_per_group());
auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
for(auto param: net->params()){
- if(param->id()==param->owner())
+ if(param->id() == param->owner())
for(auto entry: paramid2slices[param->id()])
slice_size->push_back(entry.second);
}
- vector<shared_ptr<Worker>> workers;
+ vector<Worker*> workers;
if(!cluster->has_worker())
return workers;
//LOG(ERROR)<<net->ToString();
@@ -191,33 +192,33 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
if(gid==gstart)
train_net=net;
else{
- train_net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
+ train_net=NeuralNet::Create(mproto.neuralnet(), kTrain,
cluster->nworkers_per_group());
// the train net for other groups may share parameter values from the
// first group
if(cluster->share_memory())
- train_net->ShareParams(net, kValueOnly);
+ train_net->ShareParams(net);
}
if(gid==0){
// validation and test are performed only by the first group
if(mproto.test_steps()){
- test_net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTest,
+ test_net=NeuralNet::Create(mproto.neuralnet(), kTest,
cluster->nworkers_per_group());
if(test_net!=nullptr)
- test_net->ShareParams(train_net, kValueOnly);
+ test_net->ShareParams(train_net);
}
if(mproto.validation_steps()){
- validation_net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kValidation,
+ validation_net=NeuralNet::Create(mproto.neuralnet(), kValidation,
cluster->nworkers_per_group());
if(validation_net!=nullptr)
- validation_net->ShareParams(train_net, kValueOnly);
+ validation_net->ShareParams(train_net);
}
}
// create ServerShard for the workers
auto shard=make_shared<WorkerShard>();
worker_shards_[gid]=shard;
for(auto layer: train_net->layers()){
- int procsid=cluster->ProcsIDOf(gid, layer->partitionid(), kWorkerLayer);
+ int procsid=cluster->ProcsIDOf(gid, layer->partition_id(), kWorkerLayer);
bool local=procsid==cluster->procs_id();
for(auto param: layer->GetParams()){
for(auto entry :paramid2slices[param->owner()]){
@@ -232,9 +233,9 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
}
}
for(int wid=wstart;wid<wend;wid++){
- shared_ptr<Worker> worker=nullptr;
+ Worker* worker=nullptr;
if(mproto.alg()==ModelProto_GradCalcAlg_kBackPropagation)
- worker=make_shared<BPWorker>(nthreads++,gid, wid);
+ worker = new BPWorker(nthreads++,gid, wid);
else{
// TODO add CDWorker
}
@@ -267,13 +268,13 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
int nthreads=1;
// create workers
vector<int> slices;
- vector<shared_ptr<Worker>> workers=CreateWorkers(nthreads, mproto, &slices);
+ vector<Worker*> workers=CreateWorkers(nthreads, mproto, &slices);
if(cluster->nserver_groups()&&cluster->nservers_per_group())
slice2server_=PartitionSlice(cluster->nservers_per_group(), slices);
nthreads+=workers.size();
// create servers
vector<HandleContext*> ctx;
- vector<shared_ptr<Server>> servers=CreateServers(nthreads, mproto, slices,
+ vector<Server*> servers=CreateServers(nthreads, mproto, slices,
&ctx);
#ifdef USE_MPI
@@ -283,14 +284,18 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
#endif
vector<std::thread> threads;
for(auto server: servers)
- threads.push_back(std::thread(&Server::Run,server.get()));
+ threads.push_back(std::thread(&Server::Run,server));
for(auto worker: workers)
- threads.push_back(std::thread(&Worker::Run,worker.get()));
+ threads.push_back(std::thread(&Worker::Run,worker));
Run(workers, servers);
for(auto& thread: threads)
thread.join();
for(auto x: ctx)
delete x;
+ for(auto x : servers)
+ delete x;
+ for(auto x : workers)
+ delete x;
}
inline int bandwidth(int bytes, system_clock::time_point start){
@@ -299,8 +304,8 @@ inline int bandwidth(int bytes, system_clock::time_point start){
return static_cast<int>(bytes*1000.f/duration.count());
}
-void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
- const vector<shared_ptr<Server>>& servers){
+void Trainer::Run(const vector<Worker*>& workers,
+ const vector<Server*>& servers){
auto cluster=Cluster::Get();
procs_id_=cluster->procs_id();
LOG(INFO)<<"Stub in process "<<procs_id_<<" starts";
@@ -364,8 +369,8 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
string prefix((char*)msg->frame_data(), msg->frame_size());
msg->next_frame();
Metric cur;
- cur.ParseString(string((char*)msg->frame_data(), msg->frame_size()));
- LOG(ERROR)<<prefix<<" step-" <<step<<", "<<cur.ToString();
+ cur.ParseFrom(string((char*)msg->frame_data(), msg->frame_size()));
+ LOG(ERROR)<<prefix<<" step-" <<step<<", "<<cur.ToLogString();
}
DeleteMsg(&msg);
}else if(cluster->nserver_groups()>0){
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index a92ba2c..80a6283 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -8,8 +8,10 @@
#include "utils/factory.h"
#include "trainer/worker.h"
#include "proto/model.pb.h"
-using std::thread;
namespace singa {
+using std::thread;
+using std::make_shared;
+
Worker::Worker(int thread_id, int group_id, int worker_id):
thread_id_(thread_id), group_id_(group_id), worker_id_(worker_id){
}
@@ -52,7 +54,7 @@ void Worker::Run(){
dealer_=make_shared<Dealer>(2*thread_id_);
ConnectStub(dealer_, kWorkerParam);
for(auto layer: train_net_->layers())
- if(layer->partitionid()==worker_id_)
+ if(layer->partition_id()==worker_id_)
if(layer->is_bridgedstlayer()||layer->is_bridgesrclayer()){
layer_dealer_=make_shared<Dealer>(2*thread_id_+1);
ConnectStub(layer_dealer_, kWorkerLayer);
@@ -61,7 +63,7 @@ void Worker::Run(){
step_=modelproto_.step();
// init params
for(auto layer: train_net_->layers()){
- if(layer->partitionid()==worker_id_)
+ if(layer->partition_id()==worker_id_)
for(auto param: layer->GetParams()){
// only owners fill the memory of parameter values.
// others share the memory with owners hence do not need to put/get.
@@ -79,7 +81,7 @@ void Worker::Run(){
for(step_=0;step_<modelproto_.warmup_steps();step_++)
RunOneBatch(step_, &perf);
for(auto layer: train_net_->layers()){
- if(layer->partitionid()==worker_id_)
+ if(layer->partition_id()==worker_id_)
for(auto param: layer->GetParams())
if(param->owner()==param->id())
Put(param, step_);
@@ -107,7 +109,7 @@ void Worker::Stop(){
msg->set_type(kStop);
dealer_->Send(&msg); // use param dealer to send the stop msg
}
-int Worker::Put(shared_ptr<Param> param, int step){
+int Worker::Put(Param* param, int step){
Msg* msg=new Msg();
msg->set_src(group_id_, worker_id_, kWorkerParam);
msg->set_dst(-1, -1, kStub);
@@ -116,7 +118,7 @@ int Worker::Put(shared_ptr<Param> param, int step){
dealer_->Send(&msg);
return 1;
}
-int Worker::Get(shared_ptr<Param> param, int step){
+int Worker::Get(Param* param, int step){
Msg* msg=new Msg();
msg->set_src(group_id_, worker_id_, kWorkerParam);
msg->set_dst(-1, -1, kStub);
@@ -125,7 +127,7 @@ int Worker::Get(shared_ptr<Param> param, int step){
dealer_->Send(&msg);
return 1;
}
-int Worker::Update(shared_ptr<Param> param, int step){
+int Worker::Update(Param* param, int step){
param->set_local_version(param->version());
if(updater_){
updater_->Update(step, param);
@@ -144,30 +146,29 @@ int Worker::Update(shared_ptr<Param> param, int step){
int Worker::CollectAll(shared_ptr<NeuralNet> net, int step){
auto& layers=net->layers();
for(auto& layer: layers){
- if(layer->partitionid()==worker_id_)
- for(shared_ptr<Param> p: layer->GetParams()){
+ if(layer->partition_id()==worker_id_)
+ for(Param* p: layer->GetParams()){
Collect(p, step);
}
}
return 1;
}
-int Worker::Collect(shared_ptr<Param> param, int step){
+int Worker::Collect(Param* param, int step){
while(param->version()<=param->local_version()){
std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
}
return 1;
}
-const void Worker::DisplayPerformance(const Metric & perf, const string& prefix){
+void Worker::DisplayPerformance(const string& prefix, const Metric & perf) {
Msg* msg=new Msg();
msg->set_src(group_id_, worker_id_, kWorkerParam);
msg->set_dst(-1,-1, kStub);
msg->set_type(kMetric);
msg->set_trgt(step_,0,0);
- const string disp=perf.ToString();
msg->add_frame(prefix.c_str(), prefix.length());
+ const string disp = perf.ToString();
msg->add_frame(disp.c_str(), disp.length());
dealer_->Send(&msg);
- //LOG(ERROR)<<prefix<<" "<<perf.ToString();
}
void Worker::RunOneBatch(int step, Metric* perf){
@@ -184,10 +185,8 @@ void Worker::RunOneBatch(int step, Metric* perf){
TrainOneBatch(step, perf);
//LOG(ERROR)<<"Train "<<step;
if(perf!=nullptr){
- perf->Inc();
if(DisplayNow(step)){
- //perf->Avg();
- DisplayPerformance(*perf, "Train");
+ DisplayPerformance("Train", *perf);
perf->Reset();
}
}
@@ -208,13 +207,12 @@ void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net){
Metric perf;
for(int step=0;step<nsteps;step++){
TestOneBatch(step, phase, net, &perf);
- perf.Inc();
}
//perf.Avg();
if(phase==kValidation)
- DisplayPerformance(perf, "Validation");
+ DisplayPerformance("Validation", perf);
else if (phase==kTest)
- DisplayPerformance(perf, "Test");
+ DisplayPerformance("Test", perf);
}
/****************************BPWorker**********************************/
@@ -223,19 +221,20 @@ BPWorker::BPWorker(int thread_id, int group_id, int worker_id):
Worker(thread_id, group_id, worker_id){
}
-void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){
+void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net,
+ Metric* perf){
auto& layers=net->layers();
for(auto& layer: layers){
- if(layer->partitionid()==worker_id_){
+ if(layer->partition_id()==worker_id_){
if(layer->is_bridgedstlayer()){
- auto* dst=static_cast<BridgeDstLayer*>(layer.get());
+ auto* dst=static_cast<BridgeDstLayer*>(layer);
while(!dst->ready()){
auto msg=layer_dealer_->Receive();
CHECK_EQ(msg->src_first(), group_id_);
string name((char*)msg->frame_data(), msg->frame_size());
auto tmp=net->name2layer(name);
CHECK(tmp->is_bridgedstlayer());
- auto* dstlayer=static_cast<BridgeDstLayer*>(tmp.get());
+ auto* dstlayer=static_cast<BridgeDstLayer*>(tmp);
auto data=dstlayer->mutable_data(nullptr);
msg->next_frame();
memcpy(data->mutable_cpu_data(), msg->frame_data(), msg->frame_size());
@@ -244,28 +243,25 @@ void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){
}
}
if(phase==kTrain){
- for(shared_ptr<Param> p: layer->GetParams()){
+ for(Param* p: layer->GetParams()){
Collect(p, step);
}
}
//clock_t s=clock();
- layer->ComputeFeature(phase);
+ layer->ComputeFeature(phase, perf);
//LOG(ERROR)<<layer->name()<<":"<<(clock()-s)*1.0/CLOCKS_PER_SEC;
if(layer->is_bridgesrclayer()){
auto dst=layer->dstlayers().at(0);
Msg *msg=new Msg();
msg->set_src(group_id_, worker_id_, kWorkerLayer);
- msg->set_dst(group_id_, dst->partitionid(), kWorkerLayer);
+ msg->set_dst(group_id_, dst->partition_id(), kWorkerLayer);
msg->add_frame(dst->name().c_str(), dst->name().length());
auto const & blob=layer->data(nullptr);
msg->add_frame(blob.cpu_data(), blob.count()*sizeof(float));
layer_dealer_->Send(&msg);
}
- if(phase==kTrain&&DisplayDebugInfo(step)
- &&layer->mutable_data(nullptr)!=nullptr){
- LOG(INFO)<<StringPrintf("Forward layer %10s data norm1 %13.9f",
- layer->name().c_str(), layer->data(nullptr).asum_data());
- }
+ if(phase == kTrain && DisplayDebugInfo(step))
+ LOG(INFO) << layer->DebugString(step, kForward);
}
}
}
@@ -273,25 +269,17 @@ void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){
void BPWorker::Backward(int step, shared_ptr<NeuralNet> net){
auto& layers=net->layers();
for (auto it = layers.rbegin(); it != layers.rend(); it++){
- shared_ptr<Layer> layer=*it;
- if(layer->partitionid()==worker_id_){
+ Layer* layer=*it;
+ if(layer->partition_id()==worker_id_){
if(layer->is_bridgesrclayer()){
//auto* src=static_cast<BridgeSrcLayer*>(layer.get());
// receive grad blobs
}
- layer->ComputeGradient();
- if(layer->mutable_grad(nullptr)!=nullptr&&DisplayDebugInfo(step)){
- LOG(INFO)<<StringPrintf("Backward layer %10s grad norm1 %13.9f\t",
- layer->name().c_str(), layer->grad(nullptr).asum_data());
- for(shared_ptr<Param> p: layer->GetParams())
- LOG(INFO)<<StringPrintf("param id %2d, name %10s,\
- value norm1 %13.9f, grad norm1 %13.9f",
- p->id(), p->name().c_str(),
- p->data().asum_data(), p->grad().asum_data());
- }
- for(shared_ptr<Param> p: layer->GetParams()){
+ layer->ComputeGradient(kTrain);
+ if(DisplayDebugInfo(step))
+ LOG(INFO) << layer->DebugString(step, kBackward);
+ for(Param* p: layer->GetParams())
Update(p, step);
- }
if(layer->is_bridgedstlayer()){
// send grad blobs
}
@@ -300,38 +288,14 @@ void BPWorker::Backward(int step, shared_ptr<NeuralNet> net){
}
void BPWorker::TrainOneBatch(int step, Metric* perf){
- Forward(step, kTrain, train_net_);
+ Forward(step, kTrain, train_net_, perf);
Backward(step, train_net_);
auto losslayers=train_net_->losslayers();
- for(auto layer: losslayers){
- if(layer->partitionid()==worker_id_){
- const float * ptr=layer->metric().cpu_data();
- /*
- for(int j=0;j<layer->metric().count();j++)
- perf->AddMetric(std::to_string(j)+"#"+layer->name(), ptr[j]);
- */
- // hard code display info
- perf->AddMetric(std::to_string(0)+"#loss", ptr[0]);
- perf->AddMetric(std::to_string(1)+"#accuracy", ptr[1]);
- }
- }
}
-void BPWorker::TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf){
- Forward(step, phase, net);
- const auto& losslayers=net->losslayers();
- for(auto layer: losslayers){
- if(layer->partitionid()==worker_id_){
- const float * ptr=layer->metric().cpu_data();
- /*
- for(int j=0;j<layer->metric().count();j++)
- perf.AddMetric(std::to_string(j)+"#"+layer->name(), ptr[j]);
- */
- // hard code display info
- perf->AddMetric(std::to_string(0)+"#loss", ptr[0]);
- perf->AddMetric(std::to_string(1)+"#accuracy", ptr[1]);
- }
- }
+void BPWorker::TestOneBatch(int step, Phase phase,
+ shared_ptr<NeuralNet> net, Metric* perf){
+ Forward(step, phase, net, perf);
}
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/utils/common.cc
----------------------------------------------------------------------
diff --git a/src/utils/common.cc b/src/utils/common.cc
index 67b4486..11a19f8 100644
--- a/src/utils/common.cc
+++ b/src/utils/common.cc
@@ -160,4 +160,52 @@ void SetupLog(const std::string& log_dir, const std::string& model) {
google::SetLogDestination(google::FATAL, fatal.c_str());
}
+void Metric::Add(const string& name, float value) {
+ if(entry_.find(name) == entry_.end())
+ entry_[name] = std::make_pair(1, value);
+ else{
+ auto& e = entry_.at(name);
+ e.first += 1;
+ e.second += value;
+ }
+}
+
+void Metric::Reset() {
+ for(auto e : entry_) {
+ e.second.first = 0;
+ e.second.second = 0;
+ }
+}
+const string Metric::ToLogString() const{
+ string ret;
+ size_t k = 0;
+ for(auto e : entry_) {
+ ret += e.first + " : " ;
+ ret += std::to_string(e.second.second / e.second.first);
+ if(++k < entry_.size())
+ ret += ", ";
+ }
+ return ret;
+}
+
+const string Metric::ToString() const{
+ MetricProto proto;
+ for(auto e : entry_) {
+ proto.add_name(e.first);
+ proto.add_count(e.second.first);
+ proto.add_val(e.second.second);
+ }
+ string ret;
+ proto.SerializeToString(&ret);
+ return ret;
+}
+
+void Metric::ParseFrom(const string& msg) {
+ MetricProto proto;
+ proto.ParseFromString(msg);
+ Reset();
+ for(int i = 0; i < proto.name_size(); i++) {
+ entry_[proto.name(i)] = std::make_pair(proto.count(i), proto.val(i));
+ }
+}
} // namespace singa