You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by di...@apache.org on 2016/04/08 04:38:38 UTC

[1/4] incubator-singa git commit: SINGA-130 Data prefetching

Repository: incubator-singa
Updated Branches:
  refs/heads/master 8329aa0c3 -> cf4be5a86


SINGA-130 Data prefetching

Move fetch_data thread to ComputeFeature()

Pass cpplint check


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4344ab7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4344ab7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4344ab7b

Branch: refs/heads/master
Commit: 4344ab7b2a17711d77465a6c52e50b41cad0dfef
Parents: a0bdd0b
Author: Anh Dinh <ug...@gmail.com>
Authored: Wed Apr 6 11:35:05 2016 +0800
Committer: Anh Dinh <ug...@gmail.com>
Committed: Thu Apr 7 18:47:55 2016 +0800

----------------------------------------------------------------------
 include/singa/neuralnet/input_layer.h | 16 +++++---------
 src/neuralnet/input_layer/record.cc   |  2 +-
 src/neuralnet/input_layer/store.cc    | 34 ++++++++++++------------------
 3 files changed, 20 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h
index 2e4edd1..0980f35 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -25,7 +25,6 @@
 #include <string>
 #include <vector>
 #include <thread>
-#include <deque>
 #include "singa/io/store.h"
 #include "singa/io/kvfile.h"
 #include "singa/neuralnet/layer.h"
@@ -41,14 +40,12 @@ class StoreInputLayer : virtual public InputLayer {
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
 
-  
  protected:
   /**
    * Helper method for doing the prefetching, basically read (key,value) pairs
    * to buf_keys and buf_vals_ vector of size batchsize_. 
    */
-  void fetch_data(); 
-
+  void fetch_data();
   /**
    * Parsing the (key, val) tuple to get feature (and label).
    * Subclasses must implment this function.
@@ -61,13 +58,11 @@ class StoreInputLayer : virtual public InputLayer {
   virtual bool Parse(int k, int flag, const string& key, const string& val) = 0;
 
  protected:
-  
   int batchsize_ = 1;
   int random_skip_ = 0;
   io::Store* store_ = nullptr;
- 
-  vector<std::string> buf_keys_, buf_vals_; 
-  std::deque<std::thread> threads_; // prefetching thread
+  vector<std::string> buf_keys_, buf_vals_;
+  std::thread *thread_ = nullptr;  // prefetching thread
 };
 
 /**
@@ -79,8 +74,8 @@ class SingleLabelRecordLayer : public StoreInputLayer {
  public:
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-  protected:
-  
+
+ protected:
   /**
    * Load a single record (tuple), e.g., the mean or standard variance vector.
    */
@@ -95,7 +90,6 @@ class SingleLabelRecordLayer : public StoreInputLayer {
    * UFLDL</a>
    */
   Blob<float> mean_, std_;
-  
 };
 /**
  * Specific layer that parses the value string loaded by Store as a line from

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/src/neuralnet/input_layer/record.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/record.cc b/src/neuralnet/input_layer/record.cc
index 67d312d..b14fc80 100644
--- a/src/neuralnet/input_layer/record.cc
+++ b/src/neuralnet/input_layer/record.cc
@@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& key,
   int size = data_.count() / batchsize_;
   if (image.data_size()) {
     CHECK_EQ(size, image.data_size());
-    float* ptr = data_.mutable_cpu_data() + k * size; 
+    float* ptr = data_.mutable_cpu_data() + k * size;
     for (int i = 0; i< size; i++)
       ptr[i] = image.data(i);
   } else if (image.pixel().size()) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc
index ff9eb8d..63b9d05 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -22,7 +22,6 @@
 #include "singa/neuralnet/input_layer.h"
 #include "singa/utils/context.h"
 #include "singa/utils/singleton.h"
-#include <time.h>
 namespace singa {
 
 using std::thread;
@@ -52,15 +51,9 @@ void StoreInputLayer::Setup(const LayerProto& conf,
     shape.push_back(s);
   data_.Reshape(shape);
   aux_data_.resize(batchsize_);
-  buf_keys_.resize(batchsize_); 
-  buf_vals_.resize(batchsize_); 
-
-  // initialize prefetch buffer and start the thread
-  if (conf.store_conf().prefetching())
-    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
 }
 
-void StoreInputLayer::fetch_data(){
+void StoreInputLayer::fetch_data() {
   if (store_ == nullptr) {
     store_ = io::OpenStore(layer_conf_.store_conf().backend(),
         layer_conf_.store_conf().path(),
@@ -93,25 +86,26 @@ void StoreInputLayer::fetch_data(){
 void StoreInputLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
   // if prefetching, wait for the thread to finish
-  if (layer_conf_.store_conf().prefetching()){
-    threads_.front().join(); 
-    threads_.pop_front(); 
+  if (layer_conf_.store_conf().prefetching()) {
+    if (thread_ == nullptr) {
+      buf_keys_.resize(batchsize_);
+      buf_vals_.resize(batchsize_);
+      thread_ = new thread(&StoreInputLayer::fetch_data, this);
+    }
+    thread_->join();
+    delete thread_;
+  } else {
+    fetch_data();
   }
-  else
-    fetch_data(); 
-
-  for (int k = 0; k < batchsize_; k++) 
+  for (int k = 0; k < batchsize_; k++)
     Parse(k, flag, buf_keys_[k], buf_vals_[k]);
-  
   if (layer_conf_.store_conf().prefetching())
-    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
+    thread_ = new thread(&StoreInputLayer::fetch_data, this);
 }
 
-
 void SingleLabelRecordLayer::Setup(const LayerProto& conf,
     const vector<Layer*>& srclayers) {
   StoreInputLayer::Setup(conf, srclayers);
-
 }
 
 void SingleLabelRecordLayer::ComputeFeature(int flag,
@@ -151,7 +145,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
   if (std_.count()) {
     const float* std = std_.cpu_data();
     for (int k = 0; k < batchsize_; k++) {
-      float* dptr = data_.mutable_cpu_data() + k * std_.count();  
+      float* dptr = data_.mutable_cpu_data() + k * std_.count();
       for (int i = 0; i < std_.count(); i++) {
         dptr[i] /= std[i];
       }


[3/4] incubator-singa git commit: SINGA-130 Data Prefetching

Posted by di...@apache.org.
SINGA-130 Data Prefetching

Fix a bug due to thread_ not joined and deleted clearly.
Always allocate the buf_key and buf_val as they are used no matter
prefetching is enabled or not.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/991c6ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/991c6ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/991c6ab2

Branch: refs/heads/master
Commit: 991c6ab29dd25666cff33b9cf60bde15e660068e
Parents: 4344ab7
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Thu Apr 7 22:50:00 2016 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Thu Apr 7 22:50:00 2016 +0800

----------------------------------------------------------------------
 Makefile.am                           |  3 +--
 examples/cifar10/job.conf             |  2 --
 include/singa/neuralnet/input_layer.h | 18 +--------------
 src/driver.cc                         |  1 -
 src/neuralnet/input_layer/prefetch.cc | 37 ------------------------------
 src/neuralnet/input_layer/store.cc    | 13 +++++++++--
 src/proto/job.proto                   |  8 +------
 7 files changed, 14 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index 4eb11e1..a30b9d1 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -64,7 +64,6 @@ SINGA_SRCS := src/driver.cc \
               src/neuralnet/input_layer/onehot.cc \
               src/neuralnet/input_layer/csv.cc \
               src/neuralnet/input_layer/image_preprocess.cc \
-              src/neuralnet/input_layer/prefetch.cc \
               src/neuralnet/input_layer/record.cc \
               src/neuralnet/input_layer/deprecated.cc \
               src/neuralnet/input_layer/store.cc \
@@ -178,7 +177,7 @@ bin_PROGRAMS = singa singatool $(PROGS)
 pydir = $(CURDIR)/tool/python/singa/
 py_LTLIBRARIES = $(PY_PROGS)
 #gpudir = $(CURDIR)/.libs
-#gpu_LTLIBRARIES = libsingagpu.so 
+#gpu_LTLIBRARIES = libsingagpu.so
 
 #lib_LTLIBRARIES = libsinga.la
 libsinga_la_SOURCES = $(PROTO_SRCS) $(SINGA_SRCS)

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index cbb95eb..d20b452 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -38,7 +38,6 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
-      #prefetching: false
     }
     include: kTrain
   }
@@ -68,7 +67,6 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
-      #prefetching: false
     }
     include: kTest
   }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h
index 0980f35..0499c4b 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -43,7 +43,7 @@ class StoreInputLayer : virtual public InputLayer {
  protected:
   /**
    * Helper method for doing the prefetching, basically read (key,value) pairs
-   * to buf_keys and buf_vals_ vector of size batchsize_. 
+   * to buf_keys and buf_vals_ vector of size batchsize_.
    */
   void fetch_data();
   /**
@@ -152,22 +152,6 @@ class ImagePreprocessLayer : public InputLayer {
   float scale_ = 1;
 };
 
-/**
- * TODO(wangwei) Layer for prefetching data records and parsing them.
- *
- * This layer controls the prefetching thread, i.e.,
- * creating and joining the prefetching thread.
- */
-class PrefetchLayer : public Layer {
- public:
-  ~PrefetchLayer();
-  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
-
- protected:
-  std::thread thread_;
-};
-
 class OneHotLayer : public InputLayer {
  public:
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 5016c07..2e38e53 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -115,7 +115,6 @@ void Driver::Init(int argc, char **argv) {
   RegisterLayer<LabelLayer, int>(kLabel);
   RegisterLayer<LRNLayer, int>(kLRN);
   RegisterLayer<MnistLayer, int>(kMnist);
-  RegisterLayer<PrefetchLayer, int>(kPrefetch);
   RegisterLayer<PoolingLayer, int>(kPooling);
   RegisterLayer<RBMHidLayer, int>(kRBMHid);
   RegisterLayer<RBMVisLayer, int>(kRBMVis);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/neuralnet/input_layer/prefetch.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/prefetch.cc b/src/neuralnet/input_layer/prefetch.cc
deleted file mode 100644
index 9c7f2d9..0000000
--- a/src/neuralnet/input_layer/prefetch.cc
+++ /dev/null
@@ -1,37 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/neuralnet/input_layer.h"
-namespace singa {
-
-using std::vector;
-
-PrefetchLayer::~PrefetchLayer() {
-  if (thread_.joinable())
-    thread_.join();
-}
-
-
-void PrefetchLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
-  LOG(FATAL) << "Not implemented";
-}
-
-}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc
index 63b9d05..a4754f4 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -27,9 +27,14 @@ namespace singa {
 using std::thread;
 
 StoreInputLayer::~StoreInputLayer() {
+  if (thread_ != nullptr) {
+    thread_->join();
+    delete thread_;
+  }
   if (store_ != nullptr) {
     delete store_;
   }
+
 }
 
 void StoreInputLayer::Setup(const LayerProto& conf,
@@ -74,6 +79,8 @@ void StoreInputLayer::fetch_data() {
       }
       random_skip_--;
     }
+    buf_keys_.resize(batchsize_);
+    buf_vals_.resize(batchsize_);
   }
   for (int k = 0; k < batchsize_; k++) {
     if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) {
@@ -85,20 +92,22 @@ void StoreInputLayer::fetch_data() {
 
 void StoreInputLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
+
   // if prefetching, wait for the thread to finish
   if (layer_conf_.store_conf().prefetching()) {
     if (thread_ == nullptr) {
-      buf_keys_.resize(batchsize_);
-      buf_vals_.resize(batchsize_);
       thread_ = new thread(&StoreInputLayer::fetch_data, this);
     }
     thread_->join();
     delete thread_;
+    thread_ = nullptr;
   } else {
     fetch_data();
   }
+  LOG(ERROR) << "batchsize << " << batchsize_;
   for (int k = 0; k < batchsize_; k++)
     Parse(k, flag, buf_keys_[k], buf_vals_[k]);
+  LOG(ERROR) << "after parse ";
   if (layer_conf_.store_conf().prefetching())
     thread_ = new thread(&StoreInputLayer::fetch_data, this);
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/proto/job.proto
----------------------------------------------------------------------
diff --git a/src/proto/job.proto b/src/proto/job.proto
index 0a1f968..b4aa971 100644
--- a/src/proto/job.proto
+++ b/src/proto/job.proto
@@ -228,7 +228,6 @@ message LayerProto {
   // layer specific configuration
   // configuration for input layers, id range [100, 200)
   optional StoreProto store_conf = 100;
-  optional PrefetchProto prefetch_conf = 102;
   optional DataProto lmdbdata_conf = 190;
   optional MnistProto mnist_conf = 192;
   optional RGBImageProto rgbimage_conf = 193;
@@ -359,10 +358,6 @@ message RGBImageProto {
   optional string meanfile = 4 [default = ""];
 }
 
-message PrefetchProto {
-  repeated LayerProto sublayers = 1;
-}
-
 message SplitProto {
   optional int32 num_splits = 1 [default = 1];
 }
@@ -380,7 +375,7 @@ message StoreProto {
   optional bool encoded = 10 [default = false];
   optional int32 random_skip = 11 [default = 0];
   optional bool has_label = 12 [default = true];
-  optional bool prefetching = 13 [default = true];
+  optional bool prefetching = 13 [default = false];
 }
 
 message CharRNNProto {
@@ -652,7 +647,6 @@ enum LayerType {
    */
   kCSVInput = 100;
   kImagePreprocess = 101;
-  kPrefetch = 102;
   kRecordInput = 103;
   kLMDBData = 190;  // deprecated
   kLabel = 191;  // deprecated


[2/4] incubator-singa git commit: SINGA-130 Data prefetching layer

Posted by di...@apache.org.
SINGA-130 Data prefetching layer

Extended StoreInputLayer to support prefetching of data. It maintains a buffer for (key,value) pairs read from the storage
layer. In Setup(), it launches a new thread for reading data into the buffer. This thread stores data into the buffer. The
ComputeFeature() method waits for thread to finish (join) before parsing it into data_ and aux_ field. Finally, it launches
another thread.

In terms of memory consumption, this prefetching use extra (batchsize*recordsize) bytes for the buffer. However, we observe
no visible runtime improvement, as I/O time is very small (in order of milliseconds without prefetching, and tens of microsecond
with prefetching) compared to CPU time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/a0bdd0b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/a0bdd0b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/a0bdd0b8

Branch: refs/heads/master
Commit: a0bdd0b85ddba7d670ab04c5de04a29c8366e868
Parents: 5f67e57
Author: Anh Dinh <ug...@gmail.com>
Authored: Tue Apr 5 23:14:36 2016 +0800
Committer: Anh Dinh <ug...@gmail.com>
Committed: Thu Apr 7 18:47:55 2016 +0800

----------------------------------------------------------------------
 examples/cifar10/job.conf             |  2 ++
 include/singa/neuralnet/input_layer.h | 18 ++++++++--
 include/singa/neuralnet/layer.h       |  4 +--
 src/neuralnet/input_layer/record.cc   |  2 +-
 src/neuralnet/input_layer/store.cc    | 57 +++++++++++++++++++++---------
 src/proto/job.proto                   |  2 ++
 6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index d20b452..cbb95eb 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -38,6 +38,7 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
+      #prefetching: false
     }
     include: kTrain
   }
@@ -67,6 +68,7 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
+      #prefetching: false
     }
     include: kTest
   }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h
index 73d509b..2e4edd1 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <vector>
 #include <thread>
+#include <deque>
 #include "singa/io/store.h"
 #include "singa/io/kvfile.h"
 #include "singa/neuralnet/layer.h"
@@ -40,9 +41,15 @@ class StoreInputLayer : virtual public InputLayer {
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
 
-
+  
  protected:
   /**
+   * Helper method for doing the prefetching, basically read (key,value) pairs
+   * to buf_keys and buf_vals_ vector of size batchsize_. 
+   */
+  void fetch_data(); 
+
+  /**
    * Parsing the (key, val) tuple to get feature (and label).
    * Subclasses must implment this function.
    * @param[in] k parse this tuple as the k-th instance of one mini-batch.
@@ -54,9 +61,13 @@ class StoreInputLayer : virtual public InputLayer {
   virtual bool Parse(int k, int flag, const string& key, const string& val) = 0;
 
  protected:
+  
   int batchsize_ = 1;
   int random_skip_ = 0;
   io::Store* store_ = nullptr;
+ 
+  vector<std::string> buf_keys_, buf_vals_; 
+  std::deque<std::thread> threads_; // prefetching thread
 };
 
 /**
@@ -68,8 +79,8 @@ class SingleLabelRecordLayer : public StoreInputLayer {
  public:
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-
- protected:
+  protected:
+  
   /**
    * Load a single record (tuple), e.g., the mean or standard variance vector.
    */
@@ -84,6 +95,7 @@ class SingleLabelRecordLayer : public StoreInputLayer {
    * UFLDL</a>
    */
   Blob<float> mean_, std_;
+  
 };
 /**
  * Specific layer that parses the value string loaded by Store as a line from

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/layer.h b/include/singa/neuralnet/layer.h
index ce47b47..c8ea3fc 100644
--- a/include/singa/neuralnet/layer.h
+++ b/include/singa/neuralnet/layer.h
@@ -208,7 +208,7 @@ class Layer {
   /**
    * @return a const ref for Blob vector storing feature values of this layer.
    */
-  virtual const vector<Blob<float>*>& data() const {
+  virtual const vector<Blob<float>*>& data() {
     return datavec_;
   }
 
@@ -252,7 +252,7 @@ class Layer {
   /**
    * @return auxiliary data, e.g., image label.
    */
-  virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) const {
+  virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) {
     return aux_data_;
   }
   /**

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/record.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/record.cc b/src/neuralnet/input_layer/record.cc
index b14fc80..67d312d 100644
--- a/src/neuralnet/input_layer/record.cc
+++ b/src/neuralnet/input_layer/record.cc
@@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& key,
   int size = data_.count() / batchsize_;
   if (image.data_size()) {
     CHECK_EQ(size, image.data_size());
-    float* ptr = data_.mutable_cpu_data() + k * size;
+    float* ptr = data_.mutable_cpu_data() + k * size; 
     for (int i = 0; i< size; i++)
       ptr[i] = image.data(i);
   } else if (image.pixel().size()) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc
index 3b642ca..ff9eb8d 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -22,9 +22,11 @@
 #include "singa/neuralnet/input_layer.h"
 #include "singa/utils/context.h"
 #include "singa/utils/singleton.h"
-
+#include <time.h>
 namespace singa {
 
+using std::thread;
+
 StoreInputLayer::~StoreInputLayer() {
   if (store_ != nullptr) {
     delete store_;
@@ -44,11 +46,21 @@ void StoreInputLayer::Setup(const LayerProto& conf,
   } else {
     batchsize_ = conf.store_conf().batchsize(0);
   }
+
+  vector<int> shape {batchsize_};
+  for (int s : conf.store_conf().shape())
+    shape.push_back(s);
+  data_.Reshape(shape);
+  aux_data_.resize(batchsize_);
+  buf_keys_.resize(batchsize_); 
+  buf_vals_.resize(batchsize_); 
+
+  // initialize prefetch buffer and start the thread
+  if (conf.store_conf().prefetching())
+    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
 }
 
-void StoreInputLayer::ComputeFeature(int flag,
-    const vector<Layer*>& srclayers) {
-  string key, val;
+void StoreInputLayer::fetch_data(){
   if (store_ == nullptr) {
     store_ = io::OpenStore(layer_conf_.store_conf().backend(),
         layer_conf_.store_conf().path(),
@@ -61,6 +73,7 @@ void StoreInputLayer::ComputeFeature(int flag,
       random_skip_ = distribution(*generator);
     }
 
+    string key, val;
     while (random_skip_ > 0) {
       if (!store_->Read(&key, &val)) {
         store_->SeekToFirst();
@@ -70,30 +83,43 @@ void StoreInputLayer::ComputeFeature(int flag,
     }
   }
   for (int k = 0; k < batchsize_; k++) {
-    if (!store_->Read(&key, &val)) {
+    if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) {
       store_->SeekToFirst();
-      CHECK(store_->Read(&key, &val));
+      CHECK(store_->Read(&buf_keys_[k], &buf_vals_[k]));
     }
-    // TODO(wangwei) random skip and shuffle among this mini-batch
-    Parse(k, flag, key, val);
   }
 }
 
+void StoreInputLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
+  // if prefetching, wait for the thread to finish
+  if (layer_conf_.store_conf().prefetching()){
+    threads_.front().join(); 
+    threads_.pop_front(); 
+  }
+  else
+    fetch_data(); 
+
+  for (int k = 0; k < batchsize_; k++) 
+    Parse(k, flag, buf_keys_[k], buf_vals_[k]);
+  
+  if (layer_conf_.store_conf().prefetching())
+    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
+}
+
+
 void SingleLabelRecordLayer::Setup(const LayerProto& conf,
     const vector<Layer*>& srclayers) {
   StoreInputLayer::Setup(conf, srclayers);
 
-  vector<int> shape {batchsize_};
-  for (int s : conf.store_conf().shape())
-    shape.push_back(s);
-  data_.Reshape(shape);
-  aux_data_.resize(batchsize_);
 }
+
 void SingleLabelRecordLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
-  StoreInputLayer::ComputeFeature(flag, srclayers);
 
+  StoreInputLayer::ComputeFeature(flag, srclayers);
   auto& store_conf = layer_conf_.store_conf();
+
   if (store_conf.has_mean_file() && mean_.count() == 0) {
     mean_.Reshape(vector<int>{data_.count() / batchsize_});
     LoadRecord(store_conf.backend(), store_conf.mean_file(), &mean_);
@@ -125,7 +151,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
   if (std_.count()) {
     const float* std = std_.cpu_data();
     for (int k = 0; k < batchsize_; k++) {
-      float* dptr = data_.mutable_cpu_data() + k * std_.count();
+      float* dptr = data_.mutable_cpu_data() + k * std_.count();  
       for (int i = 0; i < std_.count(); i++) {
         dptr[i] /= std[i];
       }
@@ -133,5 +159,4 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
   }
 }
 
-
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/proto/job.proto
----------------------------------------------------------------------
diff --git a/src/proto/job.proto b/src/proto/job.proto
index 7157a84..0a1f968 100644
--- a/src/proto/job.proto
+++ b/src/proto/job.proto
@@ -380,7 +380,9 @@ message StoreProto {
   optional bool encoded = 10 [default = false];
   optional int32 random_skip = 11 [default = 0];
   optional bool has_label = 12 [default = true];
+  optional bool prefetching = 13 [default = true];
 }
+
 message CharRNNProto {
   optional string path = 1;
   optional string vocab_path = 2;


[4/4] incubator-singa git commit: SINGA-130 Data prefetching

Posted by di...@apache.org.
SINGA-130 Data prefetching


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/cf4be5a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/cf4be5a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/cf4be5a8

Branch: refs/heads/master
Commit: cf4be5a86bb603cbcc8a0c38c5582b68ef0ffbd8
Parents: 8329aa0 991c6ab
Author: Anh Dinh <ug...@gmail.com>
Authored: Fri Apr 8 10:31:57 2016 +0800
Committer: Anh Dinh <ug...@gmail.com>
Committed: Fri Apr 8 10:31:57 2016 +0800

----------------------------------------------------------------------
 Makefile.am                           |  3 +-
 include/singa/neuralnet/input_layer.h | 24 ++++--------
 include/singa/neuralnet/layer.h       |  4 +-
 src/driver.cc                         |  1 -
 src/neuralnet/input_layer/prefetch.cc | 37 ------------------
 src/neuralnet/input_layer/store.cc    | 60 ++++++++++++++++++++++--------
 src/proto/job.proto                   |  8 +---
 7 files changed, 56 insertions(+), 81 deletions(-)
----------------------------------------------------------------------