You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/01/02 16:20:05 UTC

[4/7] incubator-singa git commit: SINGA-97 Add HDFS Store

SINGA-97 Add HDFS Store

This ticket implements HDFS Store for reading data from HDFS. It complements
the existing CSV Store which reads data from CSV file. HDFS is the popular
distributed file system with high (sequential) I/O throughputs, thus supporting
it is necessary in order for SINGA to scale.

HDFS usage in SINGA is different to that in standard MapReduce applications.
Specifically, each SINGA worker may train on sequences of records which do not
lie within block boundary, whereas in MapReduce  each Mapper process a number
of complete blocks.  In MapReduce, the runtime engine may fetch and cache the
entire block over the network, knowing that the block will be processed
entirely. In SINGA, such pre-fetching and caching strategy will be sub-optimal
because it wastes I/O and network bandwidth on data records which are not used.

We defer I/O optimization to a future ticket.

For implementation, we choose `libhdfs3` from Pivotal for HDFS implementation
in C++. This library is built natively for C++, hence it is more optimized and
easier to deploy than the original  `libhdfs` library that is shipped with
Hadoop. libhdfs3 makes extensive use of short-circuit reads to improve local
reads, and it often complain when such option is not set.

Finally, we test the implementation in a distributed environment set up from a
number of  Docker containers. We test with both CIFAR and MNIST examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/8a07a294
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/8a07a294
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/8a07a294

Branch: refs/heads/master
Commit: 8a07a29462c6d8ad1d2da17da4a018dfc327c121
Parents: 9ff176c
Author: Anh Dinh <ug...@gmail.com>
Authored: Thu Nov 26 17:44:15 2015 +0800
Committer: WANG Sheng <wa...@gmail.com>
Committed: Sat Jan 2 19:58:14 2016 +0800

----------------------------------------------------------------------
 Makefile.am                       |  12 ++-
 examples/cifar10/Makefile.example |   5 ++
 examples/cifar10/create_data.cc   |  30 +++++---
 examples/mnist/Makefile.example   |   5 ++
 examples/mnist/create_data.cc     |  13 +++-
 include/singa/io/hdfs_store.h     |  38 +++++++++-
 include/singa/io/hdfsfile.h       | 131 ++++++++++++++++++++++++++++++++
 include/singa/io/kvfile_store.h   |   1 +
 include/singa/io/store.h          |   6 ++
 include/singa/io/textfile_store.h |   1 +
 src/io/hdfsfile.cc                | 135 +++++++++++++++++++++++++++++++++
 src/io/hdfsfile_store.cc          |  75 ++++++++++++++++++
 src/io/kvfile_store.cc            |   3 +
 src/io/store.cc                   |  10 ++-
 src/io/textfile_store.cc          |   3 +
 15 files changed, 448 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index 6466f92..d78a150 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -33,7 +33,7 @@ CFLAGS += $(DEBUG)
 CXXFLAGS += $(DEBUG)
 AC_CXXFLAGS = $(DEBUG)
 
-INCLUDES = -I$(top_srcdir)/include
+INCLUDES = -I$(top_srcdir)/include -I/usr/local/include/hdfs
 
 PROTOS := $(top_srcdir)/src/proto/singa.proto \
           $(top_srcdir)/src/proto/job.proto \
@@ -102,6 +102,8 @@ SINGA_SRCS := src/driver.cc \
               src/io/kvfile_store.cc \
               src/io/textfile_store.cc \
               src/io/store.cc \
+              src/io/hdfsfile.cc \
+              src/io/hdfsfile_store.cc \
               src/utils/cluster.cc \
               src/utils/cluster_rt.cc \
               src/utils/graph.cc \
@@ -144,6 +146,8 @@ SINGA_HDRS := include/singa.h \
               include/singa/io/kvfile_store.h \
               include/singa/io/textfile_store.h \
               include/mshadow/cxxnet_op.h \
+              include/singa/io/hdfsfile.h \
+              include/singa/io/hdfsfile_store.h \
               include/mshadow/tensor_expr.h \
               include/mshadow/tensor_container.h \
               include/mshadow/tensor_expr_ext.h \
@@ -209,7 +213,8 @@ singa_LDFLAGS = -lsinga \
                 -lopenblas \
                 -lzmq \
                 -lczmq \
-                -lzookeeper_mt
+                -lzookeeper_mt \
+                -lhdfs3
 if LMDB
 singa_LDFLAGS += -llmdb
 endif
@@ -233,7 +238,8 @@ singatool_CXXFLAGS = -Wall -pthread -fPIC -std=c++11 -MMD -Wno-unknown-pragmas \
 singatool_LDFLAGS = -lsinga \
                     -lglog  \
                     -lprotobuf \
-                    -lzookeeper_mt 
+                    -lzookeeper_mt \
+                    -lhdfs3
 
 #if DCUDA
 #singatool_SOURCES += $(CUDA_SRCS) $(CUDA_HDRS)  

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/examples/cifar10/Makefile.example
----------------------------------------------------------------------
diff --git a/examples/cifar10/Makefile.example b/examples/cifar10/Makefile.example
index dd65d7d..775e165 100644
--- a/examples/cifar10/Makefile.example
+++ b/examples/cifar10/Makefile.example
@@ -28,6 +28,11 @@ cifar-10-binary-bin:
 	wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz
 	tar xf cifar-10-binary.tar.gz
 
+compile:
+	$(CXX) create_data.cc -std=c++11 -lsinga -lprotobuf -lglog -lhdfs3 \
+		-I../../include -L../../.libs/ -Wl,-unresolved-symbols=ignore-in-shared-libs \
+		-Wl,-rpath=../../.libs/  -o create_data.bin
+
 create:
 	$(CXX) create_data.cc -std=c++11 -lsinga -lprotobuf -lglog \
 		-I../../include -L../../.libs/ -Wl,-unresolved-symbols=ignore-in-shared-libs \

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/examples/cifar10/create_data.cc
----------------------------------------------------------------------
diff --git a/examples/cifar10/create_data.cc b/examples/cifar10/create_data.cc
index 5873c0e..05169d8 100644
--- a/examples/cifar10/create_data.cc
+++ b/examples/cifar10/create_data.cc
@@ -19,16 +19,16 @@
 *
 *************************************************************/
 
-//
-// This code creates training and test DataShard for CIFAR dataset.
-// It is adapted from the convert_cifar_data from Caffe
-//
-// Usage:
-//    create_shard.bin input_folder output_folder
-//
-// The CIFAR dataset could be downloaded at
-//    http://www.cs.toronto.edu/~kriz/cifar.html
-//
+
+/**
+ * Create training and test DataShard for CIFAR dataset. 
+ * It is adapted from convert_cifar_data from Caffe. 
+ *    create_shard.bin <input> <output_folder> 
+ * 
+ * Read from JobConf object the option to use KVfile, HDFS or other (1st layer
+ * store_conf object). 
+ * To load to HDFS, specify "hdfs://namenode/examples" as the output folder
+ */
 
 #include <glog/logging.h>
 #include <fstream>
@@ -38,6 +38,8 @@
 
 #include "singa/io/store.h"
 #include "singa/proto/common.pb.h"
+#include "singa/utils/common.h"
+#include "singa/proto/job.pb.h"
 
 using std::string;
 
@@ -45,6 +47,7 @@ const int kCIFARSize = 32;
 const int kCIFARImageNBytes = 3072;
 const int kCIFARBatchSize = 10000;
 const int kCIFARTrainBatches = 5;
+const char JOB_CONFIG[] = "job.conf";
 
 void read_image(std::ifstream* file, int* label, char* buffer) {
   char label_char;
@@ -58,7 +61,6 @@ void create_data(const string& input_folder, const string& output_folder) {
   int label;
   char str_buffer[kCIFARImageNBytes];
   string rec_buf;
-
   singa::RecordProto image;
   image.add_shape(3);
   image.add_shape(kCIFARSize);
@@ -69,7 +71,11 @@ void create_data(const string& input_folder, const string& output_folder) {
   for (int i = 0; i < kCIFARImageNBytes; i++)
     mean.add_data(0.f);
 
-  auto store = singa::io::CreateStore("kvfile");
+  singa::JobProto job_proto;
+  singa::ReadProtoFromTextFile(JOB_CONFIG, &job_proto);
+  string store_backend =
+        job_proto.neuralnet().layer(0).store_conf().backend();
+  auto store = singa::io::CreateStore(store_backend);
   CHECK(store->Open(output_folder + "/train_data.bin", singa::io::kCreate));
   LOG(INFO) << "Preparing training data";
   int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/examples/mnist/Makefile.example
----------------------------------------------------------------------
diff --git a/examples/mnist/Makefile.example b/examples/mnist/Makefile.example
index 733633d..ba2308b 100644
--- a/examples/mnist/Makefile.example
+++ b/examples/mnist/Makefile.example
@@ -33,6 +33,11 @@ mnist:
 	gunzip train-images-idx3-ubyte.gz && gunzip train-labels-idx1-ubyte.gz
 	gunzip t10k-images-idx3-ubyte.gz && gunzip t10k-labels-idx1-ubyte.gz
 
+compile:
+	$(CXX) create_data.cc -std=c++11 -lsinga -lprotobuf -lglog -lhdfs3 -I../../include \
+		-L../../.libs/ -Wl,-unresolved-symbols=ignore-in-shared-libs -Wl,-rpath=../../.libs/ \
+		-o create_data.bin
+
 create:
 	$(CXX) create_data.cc -std=c++11 -lsinga -lprotobuf -lglog -I../../include \
 		-L../../.libs/ -Wl,-unresolved-symbols=ignore-in-shared-libs -Wl,-rpath=../../.libs/ \

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/examples/mnist/create_data.cc
----------------------------------------------------------------------
diff --git a/examples/mnist/create_data.cc b/examples/mnist/create_data.cc
index 5e51e97..66a4905 100644
--- a/examples/mnist/create_data.cc
+++ b/examples/mnist/create_data.cc
@@ -38,14 +38,19 @@
 #include "singa/io/store.h"
 #include "singa/utils/common.h"
 #include "singa/proto/common.pb.h"
+#include "singa/proto/job.pb.h"
 
 using std::string;
 
+const char JOB_CONFIG[] = "job.conf";
+
 uint32_t swap_endian(uint32_t val) {
     val = ((val << 8) & 0xFF00FF00) | ((val >> 8) & 0xFF00FF);
     return (val << 16) | (val >> 16);
 }
 
+// output is the full path, unlike create_data in CIFAR with only
+// specifies the directory
 void create_data(const char* image_filename, const char* label_filename,
         const char* output) {
   // Open files
@@ -76,7 +81,13 @@ void create_data(const char* image_filename, const char* label_filename,
   image_file.read(reinterpret_cast<char*>(&cols), 4);
   cols = swap_endian(cols);
 
-  auto store = singa::io::OpenStore("kvfile", output, singa::io::kCreate);
+  // read backend from the job.conf
+  singa::JobProto job_proto;
+  singa::ReadProtoFromTextFile(JOB_CONFIG.c_str(), &job_proto);
+  string store_backend =
+    job_proto.neuralnet().layer(0).store_conf().backend();
+
+  auto store = singa::io::OpenStore(store_backend, output, singa::io::kCreate);
   char label;
   char* pixels = new char[rows * cols];
   int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/include/singa/io/hdfs_store.h
----------------------------------------------------------------------
diff --git a/include/singa/io/hdfs_store.h b/include/singa/io/hdfs_store.h
index f85615b..1fb9258 100644
--- a/include/singa/io/hdfs_store.h
+++ b/include/singa/io/hdfs_store.h
@@ -19,4 +19,40 @@
 *
 *************************************************************/
 
-// TODO(wangwei) use hdfs as data storage
+#ifndef SINGA_IO_HDFS_STORE_H_
+#define SINGA_IO_HDFS_STORE_H_
+
+#include <string>
+#include "singa/io/store.h"
+#include "singa/io/hdfsfile.h"
+
+namespace singa {
+namespace io {
+
+/**
+ * HDFS implementation of the Store interface. The store manages key-value 
+ * records storing in HDFS files. 
+ *
+ * The store consists of records of the following format:
+ *      [<length><content>] 
+ */
+class HDFSStore : public Store {
+ public:
+  ~HDFSStore() { Close();}
+  bool Open(const std::string& source, Mode mode) override;
+  void Close() override;
+  bool Read(std::string* key, std::string* value) override;
+  void SeekToFirst() override;
+  void Seek(int offset) override;
+  bool Write(const std::string& key, const std::string& value) override;
+  void Flush() override;
+
+ private:
+  HDFSFile* file_ = nullptr;
+  Mode mode_;
+};
+
+}  // namespace io
+}  // namespace singa
+
+#endif  // SINGA_IO_HDFS_STORE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/include/singa/io/hdfsfile.h
----------------------------------------------------------------------
diff --git a/include/singa/io/hdfsfile.h b/include/singa/io/hdfsfile.h
new file mode 100644
index 0000000..f92910e
--- /dev/null
+++ b/include/singa/io/hdfsfile.h
@@ -0,0 +1,131 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_IO_HDFSFILE_H_
+#define SINGA_IO_HDFSFILE_H_
+
+#include <fstream>
+#include <string>
+#include <unordered_set>
+
+
+#define USE_PROTOBUF 1
+
+#ifdef USE_PROTOBUF
+#include <google/protobuf/message.h>
+#endif
+
+#include <hdfs.h>
+
+namespace singa {
+namespace io {
+
+/**
+ * HDFSFile represents a specific partition of the HDFS file storing training/validation
+ * or test data. HDFS library maintains its own buffer, so we don't need one. 
+ * 
+ * Each record is of the form: <length><content>
+ */
+class HDFSFile {
+ public:
+  enum Mode {
+    // read only mode used in training
+    kRead = 0,
+    // write mode used in creating HDFSFile (will overwrite previous one)
+    kCreate = 1,
+    // append mode, e.g. used when previous creating crashes
+    kAppend = 2
+  };
+
+  /**
+   * HDFSFile constructor.
+   *
+   * @param path path to file, of the form "hdfs://namenode/file_path"
+   * @param mode HDFSFile::kRead, HDFSFile::kCreate or HDFSFile::kAppend
+   */
+  HDFSFile(const std::string& path, Mode mode);
+  ~HDFSFile();
+
+#ifdef USE_PROTOBUF
+  /**
+   * read next tuple from the HDFSFile.
+   *
+   * @param val Record of type Message
+   * @return false if read unsuccess, e.g., the tuple was not inserted
+   *         completely.
+   */
+  bool Next(google::protobuf::Message* val);
+  /**
+   * Append one record to the HDFSFile.
+   *
+   * @param val
+   * @return false if unsucess, e.g., inserted before
+   */
+  bool Insert(const google::protobuf::Message& tuple);
+#endif
+
+  /**
+   * Read next record from the HDFSFile.
+   *
+   * @param val Record of type string
+   * @return false if unsuccess, e.g. the tuple was not inserted completely.
+   */
+  bool Next(std::string* val);
+  /**
+   * Append record to the KVFile.
+   *
+   * @param key e.g., image path
+   * @param val
+   * @return false if unsucess, e.g., inserted before
+   */
+  bool Insert(const std::string& tuple);
+  /**
+   * Move the read pointer to the head of the KVFile file.
+   * Used for repeated reading.
+   */
+  void Seek(int offset);
+
+  /**
+   * Flush buffered data to disk.
+   * Used only for kCreate or kAppend.
+   */
+  void Flush();
+    /**
+   * @return path to HDFSFile file
+   */
+  inline std::string path() { return path_; }
+
+ private:
+  std::string path_ = "";
+  Mode mode_;
+  // handle to HDFS
+  hdfsFS fs_;
+  // handle to the HDFS open file
+  hdfsFile file_;
+
+  //!< to avoid replicated record
+  std::unordered_set<std::string> keys_;
+};
+}  // namespace io
+
+}  // namespace singa
+
+#endif  // SINGA_IO_HDFSFILE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/include/singa/io/kvfile_store.h
----------------------------------------------------------------------
diff --git a/include/singa/io/kvfile_store.h b/include/singa/io/kvfile_store.h
index 74ff127..73e4127 100644
--- a/include/singa/io/kvfile_store.h
+++ b/include/singa/io/kvfile_store.h
@@ -41,6 +41,7 @@ class KVFileStore : public Store {
   void Close() override;
   bool Read(std::string* key, std::string* value) override;
   void SeekToFirst() override;
+  void Seek(int offset) override; 
   bool Write(const std::string& key, const std::string& value) override;
   void Flush() override;
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/include/singa/io/store.h
----------------------------------------------------------------------
diff --git a/include/singa/io/store.h b/include/singa/io/store.h
index 15afb6a..a63a981 100644
--- a/include/singa/io/store.h
+++ b/include/singa/io/store.h
@@ -68,6 +68,12 @@ class Store {
    * Seek the read header to the first tuple.
    */
   virtual void SeekToFirst() = 0;
+
+  /**
+   * Seek to an offset. This allows concurrent workers to start reading from
+   * different positions (HDFS). 
+   */
+  virtual void Seek(int offset) = 0; 
   /**
    * Write a tuple.
    *

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/include/singa/io/textfile_store.h
----------------------------------------------------------------------
diff --git a/include/singa/io/textfile_store.h b/include/singa/io/textfile_store.h
index dcc559d..2cc6571 100644
--- a/include/singa/io/textfile_store.h
+++ b/include/singa/io/textfile_store.h
@@ -41,6 +41,7 @@ class TextFileStore : public Store {
   void Close() override;
   bool Read(std::string* key, std::string* value) override;
   void SeekToFirst() override;
+  void Seek(int offset) override; 
   bool Write(const std::string& key, const std::string& value) override;
   void Flush() override;
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/src/io/hdfsfile.cc
----------------------------------------------------------------------
diff --git a/src/io/hdfsfile.cc b/src/io/hdfsfile.cc
new file mode 100644
index 0000000..e093d81
--- /dev/null
+++ b/src/io/hdfsfile.cc
@@ -0,0 +1,135 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "singa/io/hdfsfile.h"
+
+#include <glog/logging.h>
+#include <iostream>
+namespace singa {
+namespace io {
+
+HDFSFile::HDFSFile(const std::string& path, Mode mode): path_(path),
+  mode_(mode) {
+  // check that path starts with hdfs://
+  CHECK_EQ(path.find("hdfs://"), 0);
+
+  // extract namenode from path
+  int path_idx = path.find_first_of("/", 7);
+  int colon_idx = path.find_first_of(":", 7);
+  std::string namenode = path.substr(7, colon_idx-7);
+  int port = atoi(path.substr(colon_idx+1, path_idx-colon_idx-1).c_str());
+  std::string filepath = path.substr(path_idx);
+
+  // connect to HDFS
+  fs_ = hdfsConnect(namenode.c_str(), port);
+  CHECK_NOTNULL(fs_);
+
+  if (mode == HDFSFile::kRead) {
+    file_ = hdfsOpenFile(fs_, filepath.c_str(), O_RDONLY, 0, 0, 0);
+  } else {
+    // check if the directory exists, create it if not.
+    int file_idx = path.find_last_of("/");
+    std::string hdfs_directory_path = path.substr(path_idx, file_idx-path_idx);
+    if (hdfsExists(fs_, hdfs_directory_path.c_str()) == -1)
+      CHECK_EQ(hdfsCreateDirectory(fs_, hdfs_directory_path.c_str()), 0);
+    file_ = hdfsOpenFile(fs_, filepath.c_str(), O_WRONLY, 0, 0, 0);
+  }
+
+  CHECK_NOTNULL(file_);
+}
+
+HDFSFile::~HDFSFile() {
+  if (mode_ != HDFSFile::kRead)
+    Flush();
+  hdfsCloseFile(fs_, file_);
+}
+
+#ifdef USE_PROTOBUF
+bool HDFSFile::Next(google::protobuf::Message* val) {
+  // read from file_, then turns it to a message
+  // red size, then content
+  int size;
+  if (hdfsRead(fs_, file_, &size, sizeof(int)) <= 0)
+    return false;
+  char *temp_buf = reinterpret_cast<char*>(malloc(size*sizeof(char)));
+  CHECK(hdfsRead(fs_, file_, temp_buf, size));
+  val->ParseFromArray(temp_buf, size);
+  free(temp_buf);
+  return true;
+}
+
+bool HDFSFile::Insert(const google::protobuf::Message& val) {
+  std::string str;
+  val.SerializeToString(&str);
+  return Insert(str);
+}
+#endif
+
+bool HDFSFile::Next(std::string* val) {
+  char size_buf[sizeof(int)];
+  // a hack to read across blocks. The first read my return in complete data,
+  // so try the second read.
+  int read_size_size = hdfsRead(fs_, file_, size_buf, sizeof(int));
+
+  if (read_size_size == 0)
+    return false;
+
+  if (read_size_size < (static_cast<int>(sizeof(int))))
+    CHECK_EQ(hdfsRead(fs_, file_, size_buf+read_size_size,
+      sizeof(int)-read_size_size),
+      sizeof(int)-read_size_size);
+  int size;
+  memcpy(&size, size_buf, sizeof(int));
+
+  char *temp_buf = reinterpret_cast<char*>(malloc(size*sizeof(char)));
+
+  int read_size = hdfsRead(fs_, file_, temp_buf, size);
+  if (read_size < size)
+    CHECK_EQ(hdfsRead(fs_, file_, temp_buf+read_size, size-read_size),
+      size-read_size);
+  val->clear();
+  val->append(temp_buf, size);
+  free(temp_buf);
+  return true;
+}
+
+// append one record to the end of the file
+bool HDFSFile::Insert(const std::string& val) {
+  CHECK(mode_ != HDFSFile::kRead);
+  // write length, then content
+  int size = val.length();
+  CHECK_EQ(hdfsWrite(fs_, file_, &size, sizeof(int)), sizeof(int));
+  CHECK_EQ(hdfsWrite(fs_, file_, val.c_str(), val.length()), val.length());
+  return true;
+}
+
+void HDFSFile::Seek(int offset) {
+  CHECK_EQ(mode_, kRead);
+  // seek back to the parition offset
+  CHECK_EQ(hdfsSeek(fs_, file_, offset), 0);
+}
+
+void HDFSFile::Flush() {
+  CHECK_EQ(hdfsFlush(fs_, file_), 0);
+}
+
+}  // namespace io
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/src/io/hdfsfile_store.cc
----------------------------------------------------------------------
diff --git a/src/io/hdfsfile_store.cc b/src/io/hdfsfile_store.cc
new file mode 100644
index 0000000..9464169
--- /dev/null
+++ b/src/io/hdfsfile_store.cc
@@ -0,0 +1,75 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include <glog/logging.h>
+#include "singa/io/hdfs_store.h"
+
+namespace singa {
+namespace io {
+
+bool HDFSStore::Open(const std::string& source, Mode mode) {
+  CHECK(file_ == nullptr);
+  if (mode == kRead)
+    file_ = new HDFSFile(source, HDFSFile::kRead);
+  else if (mode == kCreate)
+    file_ = new HDFSFile(source, HDFSFile::kCreate);
+  else if (mode == kAppend)
+    file_ = new HDFSFile(source, HDFSFile::kAppend);
+  mode_ = mode;
+  return file_ != nullptr;
+}
+
+void HDFSStore::Close() {
+  if (file_ != nullptr)
+    delete file_;
+  file_ = nullptr;
+}
+
+bool HDFSStore::Read(std::string* key, std::string* value) {
+  CHECK_EQ(mode_, kRead);
+  CHECK(file_ != nullptr);
+  return file_->Next(value);
+}
+
+void HDFSStore::SeekToFirst() {
+  CHECK_EQ(mode_, kRead);
+  CHECK(file_ != nullptr);
+  file_->Seek(0);
+}
+
+void HDFSStore::Seek(int offset) {
+  file_->Seek(offset);
+}
+
+bool HDFSStore::Write(const std::string& key, const std::string& value) {
+  CHECK_NE(mode_, kRead);
+  CHECK(file_ != nullptr);
+  return file_->Insert(value);
+}
+
+void HDFSStore::Flush() {
+  CHECK_NE(mode_, kRead);
+  CHECK(file_!= nullptr);
+  file_->Flush();
+}
+
+}  // namespace io
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/src/io/kvfile_store.cc
----------------------------------------------------------------------
diff --git a/src/io/kvfile_store.cc b/src/io/kvfile_store.cc
index fbf6982..79e2a40 100644
--- a/src/io/kvfile_store.cc
+++ b/src/io/kvfile_store.cc
@@ -55,6 +55,9 @@ void KVFileStore::SeekToFirst() {
   CHECK(file_ != nullptr);
   file_->SeekToFirst();
 }
+
+void KVFileStore::Seek(int offset){}
+
 bool KVFileStore::Write(const std::string& key, const std::string& value) {
   CHECK_NE(mode_, kRead);
   CHECK(file_ != nullptr);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/src/io/store.cc
----------------------------------------------------------------------
diff --git a/src/io/store.cc b/src/io/store.cc
index 530ca58..1e5a17f 100644
--- a/src/io/store.cc
+++ b/src/io/store.cc
@@ -22,8 +22,12 @@
 #include "singa/io/store.h"
 #include "singa/io/kvfile_store.h"
 #include "singa/io/textfile_store.h"
+#include "singa/io/hdfs_store.h"
+
+#define USE_HDFS 1
 
 namespace singa { namespace io {
+
 Store* CreateStore(const std::string& backend) {
   Store *store = nullptr;
   if (backend.compare("textfile") == 0) {
@@ -40,13 +44,13 @@ Store* CreateStore(const std::string& backend) {
 
 #ifdef USE_OPENCV
   if (backend == "imagefolder") {
-    return new ImageFolderStore();
+    store =  new ImageFolderStore();
   }
 #endif
 
 #ifdef USE_HDFS
-  if (backend == "hdfs") {
-    return new HDFSStore();
+  if (backend == "hdfsfile") {
+    store =  new HDFSStore();
   }
 #endif
   return store;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/8a07a294/src/io/textfile_store.cc
----------------------------------------------------------------------
diff --git a/src/io/textfile_store.cc b/src/io/textfile_store.cc
index e203517..4c2f1b9 100644
--- a/src/io/textfile_store.cc
+++ b/src/io/textfile_store.cc
@@ -70,6 +70,9 @@ void TextFileStore::SeekToFirst() {
   fs_->seekg(0);
 }
 
+void TextFileStore::Seek(int offset) {
+}
+
 bool TextFileStore::Write(const std::string& key, const std::string& value) {
   CHECK_NE(mode_, kRead);
   CHECK(fs_ != nullptr);