You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by di...@apache.org on 2016/04/08 04:38:39 UTC
[2/4] incubator-singa git commit: SINGA-130 Data prefetching layer
SINGA-130 Data prefetching layer
Extended StoreInputLayer to support prefetching of data. It maintains a buffer for (key,value) pairs read from the storage
layer. In Setup(), it launches a new thread for reading data into the buffer. This thread stores data into the buffer. The
ComputeFeature() method waits for thread to finish (join) before parsing it into data_ and aux_ field. Finally, it launches
another thread.
In terms of memory consumption, this prefetching use extra (batchsize*recordsize) bytes for the buffer. However, we observe
no visible runtime improvement, as I/O time is very small (in order of milliseconds without prefetching, and tens of microsecond
with prefetching) compared to CPU time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/a0bdd0b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/a0bdd0b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/a0bdd0b8
Branch: refs/heads/master
Commit: a0bdd0b85ddba7d670ab04c5de04a29c8366e868
Parents: 5f67e57
Author: Anh Dinh <ug...@gmail.com>
Authored: Tue Apr 5 23:14:36 2016 +0800
Committer: Anh Dinh <ug...@gmail.com>
Committed: Thu Apr 7 18:47:55 2016 +0800
----------------------------------------------------------------------
examples/cifar10/job.conf | 2 ++
include/singa/neuralnet/input_layer.h | 18 ++++++++--
include/singa/neuralnet/layer.h | 4 +--
src/neuralnet/input_layer/record.cc | 2 +-
src/neuralnet/input_layer/store.cc | 57 +++++++++++++++++++++---------
src/proto/job.proto | 2 ++
6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index d20b452..cbb95eb 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -38,6 +38,7 @@ neuralnet {
shape: 3
shape: 32
shape: 32
+ #prefetching: false
}
include: kTrain
}
@@ -67,6 +68,7 @@ neuralnet {
shape: 3
shape: 32
shape: 32
+ #prefetching: false
}
include: kTest
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h
index 73d509b..2e4edd1 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -25,6 +25,7 @@
#include <string>
#include <vector>
#include <thread>
+#include <deque>
#include "singa/io/store.h"
#include "singa/io/kvfile.h"
#include "singa/neuralnet/layer.h"
@@ -40,9 +41,15 @@ class StoreInputLayer : virtual public InputLayer {
void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-
+
protected:
/**
+ * Helper method for doing the prefetching, basically read (key,value) pairs
+ * to buf_keys and buf_vals_ vector of size batchsize_.
+ */
+ void fetch_data();
+
+ /**
* Parsing the (key, val) tuple to get feature (and label).
* Subclasses must implment this function.
* @param[in] k parse this tuple as the k-th instance of one mini-batch.
@@ -54,9 +61,13 @@ class StoreInputLayer : virtual public InputLayer {
virtual bool Parse(int k, int flag, const string& key, const string& val) = 0;
protected:
+
int batchsize_ = 1;
int random_skip_ = 0;
io::Store* store_ = nullptr;
+
+ vector<std::string> buf_keys_, buf_vals_;
+ std::deque<std::thread> threads_; // prefetching thread
};
/**
@@ -68,8 +79,8 @@ class SingleLabelRecordLayer : public StoreInputLayer {
public:
void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-
- protected:
+ protected:
+
/**
* Load a single record (tuple), e.g., the mean or standard variance vector.
*/
@@ -84,6 +95,7 @@ class SingleLabelRecordLayer : public StoreInputLayer {
* UFLDL</a>
*/
Blob<float> mean_, std_;
+
};
/**
* Specific layer that parses the value string loaded by Store as a line from
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/layer.h b/include/singa/neuralnet/layer.h
index ce47b47..c8ea3fc 100644
--- a/include/singa/neuralnet/layer.h
+++ b/include/singa/neuralnet/layer.h
@@ -208,7 +208,7 @@ class Layer {
/**
* @return a const ref for Blob vector storing feature values of this layer.
*/
- virtual const vector<Blob<float>*>& data() const {
+ virtual const vector<Blob<float>*>& data() {
return datavec_;
}
@@ -252,7 +252,7 @@ class Layer {
/**
* @return auxiliary data, e.g., image label.
*/
- virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) const {
+ virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) {
return aux_data_;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/record.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/record.cc b/src/neuralnet/input_layer/record.cc
index b14fc80..67d312d 100644
--- a/src/neuralnet/input_layer/record.cc
+++ b/src/neuralnet/input_layer/record.cc
@@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& key,
int size = data_.count() / batchsize_;
if (image.data_size()) {
CHECK_EQ(size, image.data_size());
- float* ptr = data_.mutable_cpu_data() + k * size;
+ float* ptr = data_.mutable_cpu_data() + k * size;
for (int i = 0; i< size; i++)
ptr[i] = image.data(i);
} else if (image.pixel().size()) {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc
index 3b642ca..ff9eb8d 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -22,9 +22,11 @@
#include "singa/neuralnet/input_layer.h"
#include "singa/utils/context.h"
#include "singa/utils/singleton.h"
-
+#include <time.h>
namespace singa {
+using std::thread;
+
StoreInputLayer::~StoreInputLayer() {
if (store_ != nullptr) {
delete store_;
@@ -44,11 +46,21 @@ void StoreInputLayer::Setup(const LayerProto& conf,
} else {
batchsize_ = conf.store_conf().batchsize(0);
}
+
+ vector<int> shape {batchsize_};
+ for (int s : conf.store_conf().shape())
+ shape.push_back(s);
+ data_.Reshape(shape);
+ aux_data_.resize(batchsize_);
+ buf_keys_.resize(batchsize_);
+ buf_vals_.resize(batchsize_);
+
+ // initialize prefetch buffer and start the thread
+ if (conf.store_conf().prefetching())
+ threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
}
-void StoreInputLayer::ComputeFeature(int flag,
- const vector<Layer*>& srclayers) {
- string key, val;
+void StoreInputLayer::fetch_data(){
if (store_ == nullptr) {
store_ = io::OpenStore(layer_conf_.store_conf().backend(),
layer_conf_.store_conf().path(),
@@ -61,6 +73,7 @@ void StoreInputLayer::ComputeFeature(int flag,
random_skip_ = distribution(*generator);
}
+ string key, val;
while (random_skip_ > 0) {
if (!store_->Read(&key, &val)) {
store_->SeekToFirst();
@@ -70,30 +83,43 @@ void StoreInputLayer::ComputeFeature(int flag,
}
}
for (int k = 0; k < batchsize_; k++) {
- if (!store_->Read(&key, &val)) {
+ if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) {
store_->SeekToFirst();
- CHECK(store_->Read(&key, &val));
+ CHECK(store_->Read(&buf_keys_[k], &buf_vals_[k]));
}
- // TODO(wangwei) random skip and shuffle among this mini-batch
- Parse(k, flag, key, val);
}
}
+void StoreInputLayer::ComputeFeature(int flag,
+ const vector<Layer*>& srclayers) {
+ // if prefetching, wait for the thread to finish
+ if (layer_conf_.store_conf().prefetching()){
+ threads_.front().join();
+ threads_.pop_front();
+ }
+ else
+ fetch_data();
+
+ for (int k = 0; k < batchsize_; k++)
+ Parse(k, flag, buf_keys_[k], buf_vals_[k]);
+
+ if (layer_conf_.store_conf().prefetching())
+ threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
+}
+
+
void SingleLabelRecordLayer::Setup(const LayerProto& conf,
const vector<Layer*>& srclayers) {
StoreInputLayer::Setup(conf, srclayers);
- vector<int> shape {batchsize_};
- for (int s : conf.store_conf().shape())
- shape.push_back(s);
- data_.Reshape(shape);
- aux_data_.resize(batchsize_);
}
+
void SingleLabelRecordLayer::ComputeFeature(int flag,
const vector<Layer*>& srclayers) {
- StoreInputLayer::ComputeFeature(flag, srclayers);
+ StoreInputLayer::ComputeFeature(flag, srclayers);
auto& store_conf = layer_conf_.store_conf();
+
if (store_conf.has_mean_file() && mean_.count() == 0) {
mean_.Reshape(vector<int>{data_.count() / batchsize_});
LoadRecord(store_conf.backend(), store_conf.mean_file(), &mean_);
@@ -125,7 +151,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
if (std_.count()) {
const float* std = std_.cpu_data();
for (int k = 0; k < batchsize_; k++) {
- float* dptr = data_.mutable_cpu_data() + k * std_.count();
+ float* dptr = data_.mutable_cpu_data() + k * std_.count();
for (int i = 0; i < std_.count(); i++) {
dptr[i] /= std[i];
}
@@ -133,5 +159,4 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
}
}
-
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/proto/job.proto
----------------------------------------------------------------------
diff --git a/src/proto/job.proto b/src/proto/job.proto
index 7157a84..0a1f968 100644
--- a/src/proto/job.proto
+++ b/src/proto/job.proto
@@ -380,7 +380,9 @@ message StoreProto {
optional bool encoded = 10 [default = false];
optional int32 random_skip = 11 [default = 0];
optional bool has_label = 12 [default = true];
+ optional bool prefetching = 13 [default = true];
}
+
message CharRNNProto {
optional string path = 1;
optional string vocab_path = 2;