You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/06/29 04:44:24 UTC
incubator-singa git commit: SINGA-211 Add TextFileReader and
TextFileWriter for CSV files
Repository: incubator-singa
Updated Branches:
refs/heads/dev 4db968c2e -> d3c1bae61
SINGA-211 Add TextFileReader and TextFileWriter for CSV files
Add TextFileReader and TextFileWriter for reading and writing csv files.
Pass a simple test for these two classes.
Fixed bugs in BinFileReader and BinFileWriter.
Add a constructor for BinFile
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d3c1bae6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d3c1bae6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d3c1bae6
Branch: refs/heads/dev
Commit: d3c1bae61253efa58afab9d5d7a7ef8a6a2596d2
Parents: 4db968c
Author: XiangruiCAI <ca...@gmail.com>
Authored: Tue Jun 28 16:09:21 2016 +0800
Committer: XiangruiCAI <ca...@gmail.com>
Committed: Tue Jun 28 21:39:27 2016 +0800
----------------------------------------------------------------------
include/singa/io/reader.h | 40 +++++++++++++---
include/singa/io/writer.h | 47 +++++++++++++------
src/io/binfile_reader.cc | 19 ++++++--
src/io/binfile_writer.cc | 70 +++++++++------------------
src/io/textfile_reader.cc | 63 +++++++++++++++++++++++++
src/io/textfile_writer.cc | 61 ++++++++++++++++++++++++
test/singa/test_binfile_rw.cc | 10 ++--
test/singa/test_textfile_rw.cc | 94 +++++++++++++++++++++++++++++++++++++
8 files changed, 327 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/include/singa/io/reader.h
----------------------------------------------------------------------
diff --git a/include/singa/io/reader.h b/include/singa/io/reader.h
index f693da2..bd1b3fe 100644
--- a/include/singa/io/reader.h
+++ b/include/singa/io/reader.h
@@ -40,7 +40,7 @@ class Reader {
/// path is the path to the storage, could be a file path, database
/// connection, or hdfs path.
/// return true if open successfully, otherwise false.
- virtual bool Open(const std::string& path, int capacity = 10485760) = 0;
+ virtual bool Open(const std::string& path) = 0;
/// Release resources.
virtual void Close() = 0;
@@ -54,11 +54,14 @@ class Reader {
virtual int Count() = 0;
};
+/// Binfilereader reads tuples from binary file with key-value pairs.
class BinFileReader : public Reader {
public:
~BinFileReader() { Close(); }
/// \copydoc Open(const std::string& path)
- bool Open(const std::string& path, int capacity = 10485760) override;
+ bool Open(const std::string& path) override;
+ /// \copydoc Open(const std::string& path), user defines capacity
+ bool Open(const std::string& path, int capacity);
/// \copydoc Close()
void Close() override;
/// \copydoc Read(std::string* key, std::string* value)
@@ -69,30 +72,55 @@ class BinFileReader : public Reader {
inline std::string path() { return path_; }
protected:
+ /// Open a file with path_ and initialize buf_
+ bool OpenFile();
/// Read the next filed, including content_len and content;
/// return true if succeed.
bool ReadField(std::string* content);
-
/// Read data from disk if the current data in the buffer is not a full field.
/// size is the size of the next field.
bool PrepareNextField(int size);
private:
- std::string path_ = "";
/// file to be read
+ std::string path_ = "";
+ /// ifstream
std::ifstream fdat_;
/// internal buffer
char* buf_ = nullptr;
/// offset inside the buf_
int offset_ = 0;
- /// allocated bytes for the buf_
- int capacity_ = 0;
+ /// allocated bytes for the buf_, default is 10M
+ int capacity_ = 10485760;
/// bytes in buf_
int bufsize_ = 0;
/// magic word
const char kMagicWord[2] = {'s', 'g'};
};
+/// TextFileReader reads tuples from CSV file.
+class TextFileReader : public Reader {
+ public:
+ ~TextFileReader() { Close(); }
+ /// \copydoc Open(const std::string& path)
+ bool Open(const std::string& path) override;
+ /// \copydoc Close()
+ void Close() override;
+ /// \copydoc Read(std::string* key, std::string* value)
+ bool Read(std::string* key, std::string* value) override;
+ /// \copydoc Count()
+ int Count() override;
+ /// return path to text file
+ inline std::string path() { return path_; }
+
+ private:
+ /// file to be read
+ std::string path_ = "";
+ /// ifstream
+ std::ifstream fdat_;
+ /// current line number
+ int lineNo_ = 0;
+};
} // namespace io
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/include/singa/io/writer.h
----------------------------------------------------------------------
diff --git a/include/singa/io/writer.h b/include/singa/io/writer.h
index a847ead..f20a22b 100644
--- a/include/singa/io/writer.h
+++ b/include/singa/io/writer.h
@@ -44,11 +44,8 @@ class Writer {
/// - a path to local directory. This is to be compatible with the older
/// version (DataShard). The KVFile is shard.dat under that directory
/// - a hdfs file starting with "hdfs://"
- /// mode is KVFile open mode(kCreate, kAppend).
- /// buffer Caches capacity bytes data for every disk op (read or write),
- /// default is 10MB.
- virtual bool Open(const std::string &path, Mode mode,
- int capacity = 10485760) = 0;
+ /// mode is open mode(kCreate, kAppend).
+ virtual bool Open(const std::string &path, Mode mode) = 0;
/// Release resources.
virtual void Close() = 0;
@@ -73,9 +70,10 @@ class Writer {
class BinFileWriter : public Writer {
public:
~BinFileWriter() { Close(); }
- /// \copydoc Open(const std::string &path, Mode mode, int bufsize = 10485760)
- bool Open(const std::string &path, Mode mode,
- int capacity = 10485760) override;
+ /// \copydoc Open(const std::string &path, Mode mode)
+ bool Open(const std::string &path, Mode mode) override;
+ /// \copydoc Open(const std::string& path), user defines capacity
+ bool Open(const std::string& path, Mode mode, int capacity);
/// \copydoc Close()
void Close() override;
/// \copydoc Write(const std::string& key, const std::string& value) override;
@@ -86,26 +84,47 @@ class BinFileWriter : public Writer {
inline std::string path() { return path_; }
protected:
- /// Setup the disk pointer to the right position for append in case that
- /// the pervious write crashes.
- /// return offset (end pos) of the last success written record.
- int PrepareForAppend(const std::string &path);
+ /// Open a file with path_ and initialize buf_
+ bool OpenFile();
private:
+ /// file to be written
std::string path_ = "";
Mode mode_;
- /// file to be written
+ /// ofstream
std::ofstream fdat_;
/// internal buffer
char *buf_ = nullptr;
/// allocated bytes for the buf_
- int capacity_ = 0;
+ int capacity_ = 10485760;
/// bytes in buf_
int bufsize_ = 0;
/// magic word
const char kMagicWord[2]= {'s', 'g'};
};
+/// TextFileWriter write training/validation/test tuples in CSV file.
+class TextFileWriter : public Writer {
+ public:
+ ~TextFileWriter() { Close(); }
+ /// \copydoc Open(const std::string &path, Mode mode)
+ bool Open(const std::string &path, Mode mode) override;
+ /// \copydoc Close()
+ void Close() override;
+ /// \copydoc Write(const std::string& key, const std::string& value) override;
+ bool Write(const std::string &key, const std::string &value) override;
+ /// \copydoc Flush()
+ void Flush() override;
+ /// return path to text file
+ inline std::string path() { return path_; }
+
+ private:
+ /// file to be written
+ std::string path_ = "";
+ Mode mode_;
+ /// ofstream
+ std::ofstream fdat_;
+};
} // namespace io
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/binfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/io/binfile_reader.cc b/src/io/binfile_reader.cc
index 6a29540..d54eeb5 100644
--- a/src/io/binfile_reader.cc
+++ b/src/io/binfile_reader.cc
@@ -21,13 +21,15 @@
namespace singa {
namespace io {
+bool BinFileReader::Open(const std::string& path) {
+ path_ = path;
+ return OpenFile();
+}
+
bool BinFileReader::Open(const std::string& path, int capacity) {
path_ = path;
capacity_ = capacity;
- buf_ = new char[capacity_];
- fdat_.open(path_, std::ios::in | std::ios::binary);
- CHECK(fdat_.is_open()) << "Cannot open file " << path_;
- return fdat_.is_open();
+ return OpenFile();
}
void BinFileReader::Close() {
@@ -57,7 +59,7 @@ bool BinFileReader::Read(std::string* key, std::string* value) {
int BinFileReader::Count() {
std::ifstream fin(path_, std::ios::in | std::ios::binary);
- CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+ CHECK(fin.is_open()) << "Cannot create file " << path_;
int count = 0;
while (true) {
size_t len;
@@ -80,6 +82,13 @@ int BinFileReader::Count() {
return count;
}
+bool BinFileReader::OpenFile() {
+ buf_ = new char[capacity_];
+ fdat_.open(path_, std::ios::in | std::ios::binary);
+ CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+ return fdat_.is_open();
+}
+
bool BinFileReader::ReadField(std::string* content) {
content->clear();
int ssize = sizeof(size_t);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/binfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/io/binfile_writer.cc b/src/io/binfile_writer.cc
index b1d7951..e207453 100644
--- a/src/io/binfile_writer.cc
+++ b/src/io/binfile_writer.cc
@@ -21,33 +21,19 @@
namespace singa {
namespace io {
+bool BinFileWriter::Open(const std::string& path, Mode mode) {
+ path_ = path;
+ mode_ = mode;
+ buf_ = new char[capacity_];
+ return OpenFile();
+}
+
bool BinFileWriter::Open(const std::string& path, Mode mode, int capacity) {
CHECK(!fdat_.is_open());
path_ = path;
mode_ = mode;
capacity_ = capacity;
- buf_ = new char[capacity_];
- switch (mode) {
- case kCreate:
- fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
- CHECK(fdat_.is_open()) << "Cannot create file " << path_;
- break;
- case kAppend:
- fdat_.open(path_, std::ios::in | std::ios::binary);
- CHECK(fdat_.is_open()) << "Cannot open file " << path_;
- fdat_.close();
- {
- int last_tuple = PrepareForAppend(path_);
- fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::in |
- std::ios::ate);
- fdat_.seekp(last_tuple);
- }
- break;
- default:
- LOG(FATAL) << "unknown model to open KVFile " << mode;
- break;
- }
- return fdat_.is_open();
+ return OpenFile();
}
void BinFileWriter::Close() {
@@ -98,7 +84,6 @@ bool BinFileWriter::Write(const std::string& key, const std::string& value) {
}
void BinFileWriter::Flush() {
- CHECK(fdat_);
if (bufsize_ > 0) {
fdat_.write(buf_, bufsize_);
fdat_.flush();
@@ -106,31 +91,22 @@ void BinFileWriter::Flush() {
}
}
-int BinFileWriter::PrepareForAppend(const std::string& path) {
- std::ifstream fin(path, std::ios::in | std::ios::binary);
- if (!fin.is_open()) return 0;
- int last_tuple_offset = 0;
- char buf[256];
- size_t len;
- char magic[4];
- while (true) {
- fin.read(magic, sizeof(magic));
- if (!fin.good()) break;
- if (magic[2] == 1) {
- fin.read(reinterpret_cast<char*>(&len), sizeof(len));
- if (!fin.good()) break;
- fin.read(buf, len);
- buf[len] = '\0';
- if (!fin.good()) break;
- }
- fin.read(reinterpret_cast<char*>(&len), sizeof(len));
- if (!fin.good()) break;
- fin.seekg(len, std::ios_base::cur);
- if (!fin.good()) break;
- last_tuple_offset = fin.tellg();
+bool BinFileWriter::OpenFile() {
+ buf_ = new char[capacity_];
+ switch (mode_) {
+ case kCreate:
+ fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
+ CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+ break;
+ case kAppend:
+ fdat_.open(path_, std::ios::app | std::ios::binary);
+ CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+ break;
+ default:
+ LOG(FATAL) << "unknown mode to open binary file " << mode_;
+ break;
}
- fin.close();
- return last_tuple_offset;
+ return fdat_.is_open();
}
} // namespace io
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/textfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/io/textfile_reader.cc b/src/io/textfile_reader.cc
new file mode 100644
index 0000000..7612241
--- /dev/null
+++ b/src/io/textfile_reader.cc
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "singa/io/reader.h"
+#include "singa/utils/logging.h"
+
+namespace singa {
+namespace io {
+bool TextFileReader::Open(const std::string& path) {
+ path_ = path;
+ fdat_.open(path_, std::ios::in);
+ CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+ return fdat_.is_open();
+}
+
+void TextFileReader::Close() {
+ if (fdat_.is_open()) fdat_.close();
+}
+
+bool TextFileReader::Read(std::string* key, std::string* value) {
+ CHECK(fdat_.is_open()) << "File not open!";
+ key->clear();
+ value->clear();
+ if (!std::getline(fdat_, *value)) {
+ if (fdat_.eof())
+ return false;
+ else
+ LOG(FATAL) << "Error in reading text file";
+ }
+ *key = std::to_string(lineNo_++);
+ return true;
+}
+
+int TextFileReader::Count() {
+ std::ifstream fin(path_, std::ios::in);
+ CHECK(fin.is_open()) << "Cannot create file " << path_;
+ int count = 0;
+ string line;
+ while (!fin.eof()) {
+ std::getline(fin, line);
+ if (line != "") count++;
+ }
+ fin.close();
+ return count;
+}
+
+} // namespace io
+} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/textfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/io/textfile_writer.cc b/src/io/textfile_writer.cc
new file mode 100644
index 0000000..7868b85
--- /dev/null
+++ b/src/io/textfile_writer.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "singa/io/writer.h"
+#include "singa/utils/logging.h"
+
+namespace singa {
+namespace io {
+bool TextFileWriter::Open(const std::string& path, Mode mode) {
+ CHECK(!fdat_.is_open());
+ path_ = path;
+ mode_ = mode;
+ switch (mode) {
+ case kCreate:
+ fdat_.open(path_, std::ios::out | std::ios::trunc);
+ CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+ break;
+ case kAppend:
+ fdat_.open(path_, std::ios::app);
+ CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+ break;
+ default:
+ LOG(FATAL) << "unknown mode to open text file " << mode;
+ break;
+ }
+ return fdat_.is_open();
+}
+
+void TextFileWriter::Close() {
+ Flush();
+ if (fdat_.is_open()) fdat_.close();
+}
+
+bool TextFileWriter::Write(const std::string& key, const std::string& value) {
+ CHECK(fdat_.is_open()) << "File not open!";
+ if (value.size() == 0) return false;
+ fdat_ << value << std::endl;
+ return true;
+}
+
+void TextFileWriter::Flush() {
+ if (fdat_.is_open())
+ fdat_.flush();
+}
+} // namespace io
+} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/test/singa/test_binfile_rw.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_binfile_rw.cc b/test/singa/test_binfile_rw.cc
index 45afd56..ddee4f1 100644
--- a/test/singa/test_binfile_rw.cc
+++ b/test/singa/test_binfile_rw.cc
@@ -23,13 +23,13 @@
#include "../include/singa/io/writer.h"
#include "gtest/gtest.h"
-const char* path = "./binfile_test";
+const char* path_bin = "./binfile_test";
using singa::io::BinFileReader;
using singa::io::BinFileWriter;
TEST(BinFileWriter, Create) {
BinFileWriter writer;
bool ret;
- ret = writer.Open(path, singa::io::kCreate);
+ ret = writer.Open(path_bin, singa::io::kCreate);
EXPECT_EQ(true, ret);
std::string key = "";
@@ -47,7 +47,7 @@ TEST(BinFileWriter, Create) {
TEST(BinFileWriter, Append) {
BinFileWriter writer;
bool ret;
- ret = writer.Open(path, singa::io::kAppend);
+ ret = writer.Open(path_bin, singa::io::kAppend, 20971520);
EXPECT_EQ(true, ret);
std::string key = "1";
@@ -67,7 +67,7 @@ TEST(BinFileWriter, Append) {
TEST(BinFileReader, Read) {
BinFileReader reader;
bool ret;
- ret = reader.Open(path);
+ ret = reader.Open(path_bin);
EXPECT_EQ(true, ret);
int cnt = reader.Count();
@@ -91,5 +91,5 @@ TEST(BinFileReader, Read) {
EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str());
reader.Close();
- remove(path);
+ remove(path_bin);
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/test/singa/test_textfile_rw.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_textfile_rw.cc b/test/singa/test_textfile_rw.cc
new file mode 100644
index 0000000..7494f46
--- /dev/null
+++ b/test/singa/test_textfile_rw.cc
@@ -0,0 +1,94 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "../include/singa/io/reader.h"
+#include "../include/singa/io/writer.h"
+#include "gtest/gtest.h"
+
+const char* path_csv = "./textfile_test.csv";
+using singa::io::TextFileReader;
+using singa::io::TextFileWriter;
+TEST(TextFileWriter, Create) {
+ TextFileWriter writer;
+ bool ret;
+ ret = writer.Open(path_csv, singa::io::kCreate);
+ EXPECT_EQ(true, ret);
+
+ std::string key = "";
+ std::string value = "This is a test for binfile io.";
+ ret = writer.Write(key, value);
+ EXPECT_EQ(true, ret);
+
+ ret = writer.Write(key, value);
+ EXPECT_EQ(true, ret);
+
+ writer.Flush();
+ writer.Close();
+}
+
+TEST(TextFileWriter, Append) {
+ TextFileWriter writer;
+ bool ret;
+ ret = writer.Open(path_csv, singa::io::kAppend);
+ EXPECT_EQ(true, ret);
+
+ std::string key = "1";
+ std::string value = "This is another test for binfile io.";
+ ret = writer.Write(key, value);
+ EXPECT_EQ(true, ret);
+
+ key = "2";
+ value = "This is another test for binfile io.";
+ ret = writer.Write(key, value);
+ EXPECT_EQ(true, ret);
+
+ writer.Flush();
+ writer.Close();
+}
+TEST(TextFileReader, Read) {
+ TextFileReader reader;
+ bool ret;
+ ret = reader.Open(path_csv);
+ EXPECT_EQ(true, ret);
+
+ int cnt = reader.Count();
+ EXPECT_EQ(4, cnt);
+
+ std::string key, value;
+ reader.Read(&key, &value);
+ EXPECT_STREQ("0", key.c_str());
+ EXPECT_STREQ("This is a test for binfile io.", value.c_str());
+
+ reader.Read(&key, &value);
+ EXPECT_STREQ("1", key.c_str());
+ EXPECT_STREQ("This is a test for binfile io.", value.c_str());
+
+ reader.Read(&key, &value);
+ EXPECT_STREQ("2", key.c_str());
+ EXPECT_STREQ("This is another test for binfile io.", value.c_str());
+
+ reader.Read(&key, &value);
+ EXPECT_STREQ("3", key.c_str());
+ EXPECT_STREQ("This is another test for binfile io.", value.c_str());
+
+ reader.Close();
+ remove(path_csv);
+}