You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/06/29 04:44:24 UTC

incubator-singa git commit: SINGA-211 Add TextFileReader and TextFileWriter for CSV files

Repository: incubator-singa
Updated Branches:
  refs/heads/dev 4db968c2e -> d3c1bae61


SINGA-211 Add TextFileReader and TextFileWriter for CSV files

Add TextFileReader and TextFileWriter for reading and writing csv files.
Pass a simple test for these two classes.

Fixed bugs in BinFileReader and BinFileWriter.
Add a constructor for BinFile


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d3c1bae6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d3c1bae6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d3c1bae6

Branch: refs/heads/dev
Commit: d3c1bae61253efa58afab9d5d7a7ef8a6a2596d2
Parents: 4db968c
Author: XiangruiCAI <ca...@gmail.com>
Authored: Tue Jun 28 16:09:21 2016 +0800
Committer: XiangruiCAI <ca...@gmail.com>
Committed: Tue Jun 28 21:39:27 2016 +0800

----------------------------------------------------------------------
 include/singa/io/reader.h      | 40 +++++++++++++---
 include/singa/io/writer.h      | 47 +++++++++++++------
 src/io/binfile_reader.cc       | 19 ++++++--
 src/io/binfile_writer.cc       | 70 +++++++++------------------
 src/io/textfile_reader.cc      | 63 +++++++++++++++++++++++++
 src/io/textfile_writer.cc      | 61 ++++++++++++++++++++++++
 test/singa/test_binfile_rw.cc  | 10 ++--
 test/singa/test_textfile_rw.cc | 94 +++++++++++++++++++++++++++++++++++++
 8 files changed, 327 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/include/singa/io/reader.h
----------------------------------------------------------------------
diff --git a/include/singa/io/reader.h b/include/singa/io/reader.h
index f693da2..bd1b3fe 100644
--- a/include/singa/io/reader.h
+++ b/include/singa/io/reader.h
@@ -40,7 +40,7 @@ class Reader {
   /// path is the path to the storage, could be a file path, database
   /// connection, or hdfs path.
   /// return true if open successfully, otherwise false.
-  virtual bool Open(const std::string& path, int capacity = 10485760) = 0;
+  virtual bool Open(const std::string& path) = 0;
 
   /// Release resources.
   virtual void Close() = 0;
@@ -54,11 +54,14 @@ class Reader {
   virtual int Count() = 0;
 };
 
+/// Binfilereader reads tuples from binary file with key-value pairs.
 class BinFileReader : public Reader {
  public:
   ~BinFileReader() { Close(); }
   /// \copydoc Open(const std::string& path)
-  bool Open(const std::string& path, int capacity = 10485760) override;
+  bool Open(const std::string& path) override;
+  /// \copydoc Open(const std::string& path), user defines capacity
+  bool Open(const std::string& path, int capacity);
   /// \copydoc Close()
   void Close() override;
   /// \copydoc Read(std::string* key, std::string* value)
@@ -69,30 +72,55 @@ class BinFileReader : public Reader {
   inline std::string path() { return path_; }
 
  protected:
+  /// Open a file with path_ and initialize buf_
+  bool OpenFile();
   /// Read the next filed, including content_len and content;
   /// return true if succeed.
   bool ReadField(std::string* content);
-
   /// Read data from disk if the current data in the buffer is not a full field.
   /// size is the size of the next field.
   bool PrepareNextField(int size);
 
  private:
-  std::string path_ = "";
   /// file to be read
+  std::string path_ = "";
+  /// ifstream
   std::ifstream fdat_;
   /// internal buffer
   char* buf_ = nullptr;
   /// offset inside the buf_
   int offset_ = 0;
-  /// allocated bytes for the buf_
-  int capacity_ = 0;
+  /// allocated bytes for the buf_, default is 10M
+  int capacity_ = 10485760;
   /// bytes in buf_
   int bufsize_ = 0;
   /// magic word
   const char kMagicWord[2] = {'s', 'g'};
 };
 
+/// TextFileReader reads tuples from CSV file.
+class TextFileReader : public Reader {
+ public:
+  ~TextFileReader() { Close(); }
+  /// \copydoc Open(const std::string& path)
+  bool Open(const std::string& path) override;
+  /// \copydoc Close()
+  void Close() override;
+  /// \copydoc Read(std::string* key, std::string* value)
+  bool Read(std::string* key, std::string* value) override;
+  /// \copydoc Count()
+  int Count() override;
+  /// return path to text file
+  inline std::string path() { return path_; }
+
+ private:
+  /// file to be read
+  std::string path_ = "";
+  /// ifstream
+  std::ifstream fdat_;
+  /// current line number
+  int lineNo_ = 0;
+};
 }  // namespace io
 }  // namespace singa
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/include/singa/io/writer.h
----------------------------------------------------------------------
diff --git a/include/singa/io/writer.h b/include/singa/io/writer.h
index a847ead..f20a22b 100644
--- a/include/singa/io/writer.h
+++ b/include/singa/io/writer.h
@@ -44,11 +44,8 @@ class Writer {
   ///  - a path to local directory. This is to be compatible with the older
   ///    version (DataShard). The KVFile is shard.dat under that directory
   ///  - a hdfs file starting with "hdfs://"
-  /// mode is KVFile open mode(kCreate, kAppend).
-  /// buffer Caches capacity bytes data for every disk op (read or write),
-  /// default is 10MB.
-  virtual bool Open(const std::string &path, Mode mode,
-                    int capacity = 10485760) = 0;
+  /// mode is open mode(kCreate, kAppend).
+  virtual bool Open(const std::string &path, Mode mode) = 0;
 
   /// Release resources.
   virtual void Close() = 0;
@@ -73,9 +70,10 @@ class Writer {
 class BinFileWriter : public Writer {
  public:
   ~BinFileWriter() { Close(); }
-  /// \copydoc Open(const std::string &path, Mode mode, int bufsize = 10485760)
-  bool Open(const std::string &path, Mode mode,
-            int capacity = 10485760) override;
+  /// \copydoc Open(const std::string &path, Mode mode)
+  bool Open(const std::string &path, Mode mode) override;
+  /// \copydoc Open(const std::string& path), user defines capacity
+  bool Open(const std::string& path, Mode mode, int capacity);
   /// \copydoc Close()
   void Close() override;
   /// \copydoc Write(const std::string& key, const std::string& value) override;
@@ -86,26 +84,47 @@ class BinFileWriter : public Writer {
   inline std::string path() { return path_; }
 
  protected:
-  /// Setup the disk pointer to the right position for append in case that
-  /// the pervious write crashes.
-  /// return offset (end pos) of the last success written record.
-  int PrepareForAppend(const std::string &path);
+  /// Open a file with path_ and initialize buf_
+  bool OpenFile();
 
  private:
+  /// file to be written
   std::string path_ = "";
   Mode mode_;
-  /// file to be written
+  /// ofstream
   std::ofstream fdat_;
   /// internal buffer
   char *buf_ = nullptr;
   /// allocated bytes for the buf_
-  int capacity_ = 0;
+  int capacity_ = 10485760;
   /// bytes in buf_
   int bufsize_ = 0;
   /// magic word
   const char kMagicWord[2]= {'s', 'g'};
 };
 
+/// TextFileWriter write training/validation/test tuples in CSV file.
+class TextFileWriter : public Writer {
+ public:
+  ~TextFileWriter() { Close(); }
+  /// \copydoc Open(const std::string &path, Mode mode)
+  bool Open(const std::string &path, Mode mode) override;
+  /// \copydoc Close()
+  void Close() override;
+  /// \copydoc Write(const std::string& key, const std::string& value) override;
+  bool Write(const std::string &key, const std::string &value) override;
+  /// \copydoc Flush()
+  void Flush() override;
+  /// return path to text file
+  inline std::string path() { return path_; }
+
+ private:
+  /// file to be written
+  std::string path_ = "";
+  Mode mode_;
+  /// ofstream
+  std::ofstream fdat_;
+};
 }  // namespace io
 }  // namespace singa
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/binfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/io/binfile_reader.cc b/src/io/binfile_reader.cc
index 6a29540..d54eeb5 100644
--- a/src/io/binfile_reader.cc
+++ b/src/io/binfile_reader.cc
@@ -21,13 +21,15 @@
 
 namespace singa {
 namespace io {
+bool BinFileReader::Open(const std::string& path) {
+  path_ = path;
+  return OpenFile();
+}
+
 bool BinFileReader::Open(const std::string& path, int capacity) {
   path_ = path;
   capacity_ = capacity;
-  buf_ = new char[capacity_];
-  fdat_.open(path_, std::ios::in | std::ios::binary);
-  CHECK(fdat_.is_open()) << "Cannot open file " << path_;
-  return fdat_.is_open();
+  return OpenFile();
 }
 
 void BinFileReader::Close() {
@@ -57,7 +59,7 @@ bool BinFileReader::Read(std::string* key, std::string* value) {
 
 int BinFileReader::Count() {
   std::ifstream fin(path_, std::ios::in | std::ios::binary);
-  CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+  CHECK(fin.is_open()) << "Cannot create file " << path_;
   int count = 0;
   while (true) {
     size_t len;
@@ -80,6 +82,13 @@ int BinFileReader::Count() {
   return count;
 }
 
+bool BinFileReader::OpenFile() {
+  buf_ = new char[capacity_];
+  fdat_.open(path_, std::ios::in | std::ios::binary);
+  CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+  return fdat_.is_open(); 
+}
+
 bool BinFileReader::ReadField(std::string* content) {
   content->clear();
   int ssize = sizeof(size_t);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/binfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/io/binfile_writer.cc b/src/io/binfile_writer.cc
index b1d7951..e207453 100644
--- a/src/io/binfile_writer.cc
+++ b/src/io/binfile_writer.cc
@@ -21,33 +21,19 @@
 
 namespace singa {
 namespace io {
+bool BinFileWriter::Open(const std::string& path, Mode mode) {
+  path_ = path;
+  mode_ = mode;
+  buf_ = new char[capacity_];
+  return OpenFile();
+}
+
 bool BinFileWriter::Open(const std::string& path, Mode mode, int capacity) {
   CHECK(!fdat_.is_open());
   path_ = path;
   mode_ = mode;
   capacity_ = capacity;
-  buf_ = new char[capacity_];
-  switch (mode) {
-    case kCreate:
-      fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
-      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
-      break;
-    case kAppend:
-      fdat_.open(path_, std::ios::in | std::ios::binary);
-      CHECK(fdat_.is_open()) << "Cannot open file " << path_;
-      fdat_.close();
-      {
-        int last_tuple = PrepareForAppend(path_);
-        fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::in |
-                              std::ios::ate);
-        fdat_.seekp(last_tuple);
-      }
-      break;
-    default:
-      LOG(FATAL) << "unknown model to open KVFile " << mode;
-      break;
-  }
-  return fdat_.is_open();
+  return OpenFile();
 }
 
 void BinFileWriter::Close() {
@@ -98,7 +84,6 @@ bool BinFileWriter::Write(const std::string& key, const std::string& value) {
 }
 
 void BinFileWriter::Flush() {
-  CHECK(fdat_);
   if (bufsize_ > 0) {
     fdat_.write(buf_, bufsize_);
     fdat_.flush();
@@ -106,31 +91,22 @@ void BinFileWriter::Flush() {
   }
 }
 
-int BinFileWriter::PrepareForAppend(const std::string& path) {
-  std::ifstream fin(path, std::ios::in | std::ios::binary);
-  if (!fin.is_open()) return 0;
-  int last_tuple_offset = 0;
-  char buf[256];
-  size_t len;
-  char magic[4];
-  while (true) {
-    fin.read(magic, sizeof(magic));
-    if (!fin.good()) break;
-    if (magic[2] == 1) {
-      fin.read(reinterpret_cast<char*>(&len), sizeof(len));
-      if (!fin.good()) break;
-      fin.read(buf, len);
-      buf[len] = '\0';
-      if (!fin.good()) break;
-    }
-    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
-    if (!fin.good()) break;
-    fin.seekg(len, std::ios_base::cur);
-    if (!fin.good()) break;
-    last_tuple_offset = fin.tellg();
+bool BinFileWriter::OpenFile() {
+  buf_ = new char[capacity_];
+  switch (mode_) {
+    case kCreate:
+      fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    case kAppend:
+      fdat_.open(path_, std::ios::app | std::ios::binary);
+      CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+      break;
+    default:
+      LOG(FATAL) << "unknown mode to open binary file " << mode_;
+      break;
   }
-  fin.close();
-  return last_tuple_offset;
+  return fdat_.is_open();
 }
 }  // namespace io
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/textfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/io/textfile_reader.cc b/src/io/textfile_reader.cc
new file mode 100644
index 0000000..7612241
--- /dev/null
+++ b/src/io/textfile_reader.cc
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "singa/io/reader.h"
+#include "singa/utils/logging.h"
+
+namespace singa {
+namespace io {
+bool TextFileReader::Open(const std::string& path) {
+  path_ = path;
+  fdat_.open(path_, std::ios::in);
+  CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+  return fdat_.is_open();
+}
+
+void TextFileReader::Close() {
+  if (fdat_.is_open()) fdat_.close();
+}
+
+bool TextFileReader::Read(std::string* key, std::string* value) {
+  CHECK(fdat_.is_open()) << "File not open!";
+  key->clear();
+  value->clear();
+  if (!std::getline(fdat_, *value)) {
+    if (fdat_.eof())
+      return false;
+    else
+      LOG(FATAL) << "Error in reading text file";
+  }
+  *key = std::to_string(lineNo_++);
+  return true;
+}
+
+int TextFileReader::Count() {
+  std::ifstream fin(path_, std::ios::in);
+  CHECK(fin.is_open()) << "Cannot create file " << path_;
+  int count = 0;
+  string line;
+  while (!fin.eof()) {
+    std::getline(fin, line);
+    if (line != "") count++;
+  }
+  fin.close();
+  return count;
+}
+
+}  // namespace io
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/textfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/io/textfile_writer.cc b/src/io/textfile_writer.cc
new file mode 100644
index 0000000..7868b85
--- /dev/null
+++ b/src/io/textfile_writer.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "singa/io/writer.h"
+#include "singa/utils/logging.h"
+
+namespace singa {
+namespace io {
+bool TextFileWriter::Open(const std::string& path, Mode mode) {
+  CHECK(!fdat_.is_open());
+  path_ = path;
+  mode_ = mode;
+  switch (mode) {
+    case kCreate:
+      fdat_.open(path_, std::ios::out | std::ios::trunc);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    case kAppend:
+      fdat_.open(path_, std::ios::app);
+      CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+      break;
+    default:
+      LOG(FATAL) << "unknown mode to open text file " << mode;
+      break;
+  }
+  return fdat_.is_open();
+}
+
+void TextFileWriter::Close() {
+  Flush();
+  if (fdat_.is_open()) fdat_.close();
+}
+
+bool TextFileWriter::Write(const std::string& key, const std::string& value) {
+  CHECK(fdat_.is_open()) << "File not open!";
+  if (value.size() == 0) return false;
+  fdat_ << value << std::endl;
+  return true;
+}
+
+void TextFileWriter::Flush() {
+  if (fdat_.is_open())
+    fdat_.flush();
+}
+}  // namespace io
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/test/singa/test_binfile_rw.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_binfile_rw.cc b/test/singa/test_binfile_rw.cc
index 45afd56..ddee4f1 100644
--- a/test/singa/test_binfile_rw.cc
+++ b/test/singa/test_binfile_rw.cc
@@ -23,13 +23,13 @@
 #include "../include/singa/io/writer.h"
 #include "gtest/gtest.h"
 
-const char* path = "./binfile_test";
+const char* path_bin = "./binfile_test";
 using singa::io::BinFileReader;
 using singa::io::BinFileWriter;
 TEST(BinFileWriter, Create) {
   BinFileWriter writer;
   bool ret;
-  ret = writer.Open(path, singa::io::kCreate);
+  ret = writer.Open(path_bin, singa::io::kCreate);
   EXPECT_EQ(true, ret);
 
   std::string key = "";
@@ -47,7 +47,7 @@ TEST(BinFileWriter, Create) {
 TEST(BinFileWriter, Append) {
   BinFileWriter writer;
   bool ret;
-  ret = writer.Open(path, singa::io::kAppend);
+  ret = writer.Open(path_bin, singa::io::kAppend, 20971520);
   EXPECT_EQ(true, ret);
 
   std::string key = "1";
@@ -67,7 +67,7 @@ TEST(BinFileWriter, Append) {
 TEST(BinFileReader, Read) {
   BinFileReader reader;
   bool ret;
-  ret = reader.Open(path);
+  ret = reader.Open(path_bin);
   EXPECT_EQ(true, ret);
 
   int cnt = reader.Count();
@@ -91,5 +91,5 @@ TEST(BinFileReader, Read) {
   EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str());
 
   reader.Close();
-  remove(path);
+  remove(path_bin);
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/test/singa/test_textfile_rw.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_textfile_rw.cc b/test/singa/test_textfile_rw.cc
new file mode 100644
index 0000000..7494f46
--- /dev/null
+++ b/test/singa/test_textfile_rw.cc
@@ -0,0 +1,94 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "../include/singa/io/reader.h"
+#include "../include/singa/io/writer.h"
+#include "gtest/gtest.h"
+
+const char* path_csv = "./textfile_test.csv";
+using singa::io::TextFileReader;
+using singa::io::TextFileWriter;
+TEST(TextFileWriter, Create) {
+  TextFileWriter writer;
+  bool ret;
+  ret = writer.Open(path_csv, singa::io::kCreate);
+  EXPECT_EQ(true, ret);
+
+  std::string key = "";
+  std::string value = "This is a test for binfile io.";
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  writer.Flush();
+  writer.Close();
+}
+
+TEST(TextFileWriter, Append) {
+  TextFileWriter writer;
+  bool ret;
+  ret = writer.Open(path_csv, singa::io::kAppend);
+  EXPECT_EQ(true, ret);
+
+  std::string key = "1";
+  std::string value = "This is another test for binfile io.";
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  key = "2";
+  value = "This is another test for binfile io.";
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  writer.Flush();
+  writer.Close();
+}
+TEST(TextFileReader, Read) {
+  TextFileReader reader;
+  bool ret;
+  ret = reader.Open(path_csv);
+  EXPECT_EQ(true, ret);
+
+  int cnt = reader.Count();
+  EXPECT_EQ(4, cnt);
+
+  std::string key, value;
+  reader.Read(&key, &value);
+  EXPECT_STREQ("0", key.c_str());
+  EXPECT_STREQ("This is a test for binfile io.", value.c_str());
+
+  reader.Read(&key, &value);
+  EXPECT_STREQ("1", key.c_str());
+  EXPECT_STREQ("This is a test for binfile io.", value.c_str());
+
+  reader.Read(&key, &value);
+  EXPECT_STREQ("2", key.c_str());
+  EXPECT_STREQ("This is another test for binfile io.", value.c_str());
+
+  reader.Read(&key, &value);
+  EXPECT_STREQ("3", key.c_str());
+  EXPECT_STREQ("This is another test for binfile io.", value.c_str());
+
+  reader.Close();
+  remove(path_csv);
+}