You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by di...@apache.org on 2016/04/08 04:38:39 UTC

[2/4] incubator-singa git commit: SINGA-130 Data prefetching layer

SINGA-130 Data prefetching layer

Extended StoreInputLayer to support prefetching of data. It maintains a buffer for (key,value) pairs read from the storage
layer. In Setup(), it launches a new thread for reading data into the buffer. This thread stores data into the buffer. The
ComputeFeature() method waits for thread to finish (join) before parsing it into data_ and aux_ field. Finally, it launches
another thread.

In terms of memory consumption, this prefetching use extra (batchsize*recordsize) bytes for the buffer. However, we observe
no visible runtime improvement, as I/O time is very small (in order of milliseconds without prefetching, and tens of microsecond
with prefetching) compared to CPU time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/a0bdd0b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/a0bdd0b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/a0bdd0b8

Branch: refs/heads/master
Commit: a0bdd0b85ddba7d670ab04c5de04a29c8366e868
Parents: 5f67e57
Author: Anh Dinh <ug...@gmail.com>
Authored: Tue Apr 5 23:14:36 2016 +0800
Committer: Anh Dinh <ug...@gmail.com>
Committed: Thu Apr 7 18:47:55 2016 +0800

----------------------------------------------------------------------
 examples/cifar10/job.conf             |  2 ++
 include/singa/neuralnet/input_layer.h | 18 ++++++++--
 include/singa/neuralnet/layer.h       |  4 +--
 src/neuralnet/input_layer/record.cc   |  2 +-
 src/neuralnet/input_layer/store.cc    | 57 +++++++++++++++++++++---------
 src/proto/job.proto                   |  2 ++
 6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index d20b452..cbb95eb 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -38,6 +38,7 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
+      #prefetching: false
     }
     include: kTrain
   }
@@ -67,6 +68,7 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
+      #prefetching: false
     }
     include: kTest
   }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h
index 73d509b..2e4edd1 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <vector>
 #include <thread>
+#include <deque>
 #include "singa/io/store.h"
 #include "singa/io/kvfile.h"
 #include "singa/neuralnet/layer.h"
@@ -40,9 +41,15 @@ class StoreInputLayer : virtual public InputLayer {
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
 
-
+  
  protected:
   /**
+   * Helper method for doing the prefetching, basically read (key,value) pairs
+   * to buf_keys and buf_vals_ vector of size batchsize_. 
+   */
+  void fetch_data(); 
+
+  /**
    * Parsing the (key, val) tuple to get feature (and label).
    * Subclasses must implment this function.
    * @param[in] k parse this tuple as the k-th instance of one mini-batch.
@@ -54,9 +61,13 @@ class StoreInputLayer : virtual public InputLayer {
   virtual bool Parse(int k, int flag, const string& key, const string& val) = 0;
 
  protected:
+  
   int batchsize_ = 1;
   int random_skip_ = 0;
   io::Store* store_ = nullptr;
+ 
+  vector<std::string> buf_keys_, buf_vals_; 
+  std::deque<std::thread> threads_; // prefetching thread
 };
 
 /**
@@ -68,8 +79,8 @@ class SingleLabelRecordLayer : public StoreInputLayer {
  public:
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-
- protected:
+  protected:
+  
   /**
    * Load a single record (tuple), e.g., the mean or standard variance vector.
    */
@@ -84,6 +95,7 @@ class SingleLabelRecordLayer : public StoreInputLayer {
    * UFLDL</a>
    */
   Blob<float> mean_, std_;
+  
 };
 /**
  * Specific layer that parses the value string loaded by Store as a line from

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/layer.h b/include/singa/neuralnet/layer.h
index ce47b47..c8ea3fc 100644
--- a/include/singa/neuralnet/layer.h
+++ b/include/singa/neuralnet/layer.h
@@ -208,7 +208,7 @@ class Layer {
   /**
    * @return a const ref for Blob vector storing feature values of this layer.
    */
-  virtual const vector<Blob<float>*>& data() const {
+  virtual const vector<Blob<float>*>& data() {
     return datavec_;
   }
 
@@ -252,7 +252,7 @@ class Layer {
   /**
    * @return auxiliary data, e.g., image label.
    */
-  virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) const {
+  virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) {
     return aux_data_;
   }
   /**

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/record.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/record.cc b/src/neuralnet/input_layer/record.cc
index b14fc80..67d312d 100644
--- a/src/neuralnet/input_layer/record.cc
+++ b/src/neuralnet/input_layer/record.cc
@@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& key,
   int size = data_.count() / batchsize_;
   if (image.data_size()) {
     CHECK_EQ(size, image.data_size());
-    float* ptr = data_.mutable_cpu_data() + k * size;
+    float* ptr = data_.mutable_cpu_data() + k * size; 
     for (int i = 0; i< size; i++)
       ptr[i] = image.data(i);
   } else if (image.pixel().size()) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc
index 3b642ca..ff9eb8d 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -22,9 +22,11 @@
 #include "singa/neuralnet/input_layer.h"
 #include "singa/utils/context.h"
 #include "singa/utils/singleton.h"
-
+#include <time.h>
 namespace singa {
 
+using std::thread;
+
 StoreInputLayer::~StoreInputLayer() {
   if (store_ != nullptr) {
     delete store_;
@@ -44,11 +46,21 @@ void StoreInputLayer::Setup(const LayerProto& conf,
   } else {
     batchsize_ = conf.store_conf().batchsize(0);
   }
+
+  vector<int> shape {batchsize_};
+  for (int s : conf.store_conf().shape())
+    shape.push_back(s);
+  data_.Reshape(shape);
+  aux_data_.resize(batchsize_);
+  buf_keys_.resize(batchsize_); 
+  buf_vals_.resize(batchsize_); 
+
+  // initialize prefetch buffer and start the thread
+  if (conf.store_conf().prefetching())
+    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
 }
 
-void StoreInputLayer::ComputeFeature(int flag,
-    const vector<Layer*>& srclayers) {
-  string key, val;
+void StoreInputLayer::fetch_data(){
   if (store_ == nullptr) {
     store_ = io::OpenStore(layer_conf_.store_conf().backend(),
         layer_conf_.store_conf().path(),
@@ -61,6 +73,7 @@ void StoreInputLayer::ComputeFeature(int flag,
       random_skip_ = distribution(*generator);
     }
 
+    string key, val;
     while (random_skip_ > 0) {
       if (!store_->Read(&key, &val)) {
         store_->SeekToFirst();
@@ -70,30 +83,43 @@ void StoreInputLayer::ComputeFeature(int flag,
     }
   }
   for (int k = 0; k < batchsize_; k++) {
-    if (!store_->Read(&key, &val)) {
+    if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) {
       store_->SeekToFirst();
-      CHECK(store_->Read(&key, &val));
+      CHECK(store_->Read(&buf_keys_[k], &buf_vals_[k]));
     }
-    // TODO(wangwei) random skip and shuffle among this mini-batch
-    Parse(k, flag, key, val);
   }
 }
 
+void StoreInputLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
+  // if prefetching, wait for the thread to finish
+  if (layer_conf_.store_conf().prefetching()){
+    threads_.front().join(); 
+    threads_.pop_front(); 
+  }
+  else
+    fetch_data(); 
+
+  for (int k = 0; k < batchsize_; k++) 
+    Parse(k, flag, buf_keys_[k], buf_vals_[k]);
+  
+  if (layer_conf_.store_conf().prefetching())
+    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
+}
+
+
 void SingleLabelRecordLayer::Setup(const LayerProto& conf,
     const vector<Layer*>& srclayers) {
   StoreInputLayer::Setup(conf, srclayers);
 
-  vector<int> shape {batchsize_};
-  for (int s : conf.store_conf().shape())
-    shape.push_back(s);
-  data_.Reshape(shape);
-  aux_data_.resize(batchsize_);
 }
+
 void SingleLabelRecordLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
-  StoreInputLayer::ComputeFeature(flag, srclayers);
 
+  StoreInputLayer::ComputeFeature(flag, srclayers);
   auto& store_conf = layer_conf_.store_conf();
+
   if (store_conf.has_mean_file() && mean_.count() == 0) {
     mean_.Reshape(vector<int>{data_.count() / batchsize_});
     LoadRecord(store_conf.backend(), store_conf.mean_file(), &mean_);
@@ -125,7 +151,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
   if (std_.count()) {
     const float* std = std_.cpu_data();
     for (int k = 0; k < batchsize_; k++) {
-      float* dptr = data_.mutable_cpu_data() + k * std_.count();
+      float* dptr = data_.mutable_cpu_data() + k * std_.count();  
       for (int i = 0; i < std_.count(); i++) {
         dptr[i] /= std[i];
       }
@@ -133,5 +159,4 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
   }
 }
 
-
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/proto/job.proto
----------------------------------------------------------------------
diff --git a/src/proto/job.proto b/src/proto/job.proto
index 7157a84..0a1f968 100644
--- a/src/proto/job.proto
+++ b/src/proto/job.proto
@@ -380,7 +380,9 @@ message StoreProto {
   optional bool encoded = 10 [default = false];
   optional int32 random_skip = 11 [default = 0];
   optional bool has_label = 12 [default = true];
+  optional bool prefetching = 13 [default = true];
 }
+
 message CharRNNProto {
   optional string path = 1;
   optional string vocab_path = 2;