You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/03/01 01:17:19 UTC
parquet-cpp git commit: PARQUET-520: Add MemoryMapSource and add unit
tests for both it and LocalFileSource
Repository: parquet-cpp
Updated Branches:
refs/heads/master 5b3e9c103 -> 41c1e6887
PARQUET-520: Add MemoryMapSource and add unit tests for both it and LocalFileSource
I also added the `file_descriptor` API so that we can verify that dtors elsewhere successfully close open files. Closes #56
Author: Wes McKinney <we...@apache.org>
Closes #66 from wesm/PARQUET-520 and squashes the following commits:
9d638ba [Wes McKinney] Add memory-mapping option to ParquetFileReader::OpenFile. Add --no-memory-map flag to parquet_reader
6389683 [Wes McKinney] Add Read API tests
dbf6a45 [Wes McKinney] Test some failure modes for LocalFileSource / MemoryMapSource
01a7d64 [Wes McKinney] Add a MemoryMapSource and use this by default for SerializedFileReader
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/41c1e688
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/41c1e688
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/41c1e688
Branch: refs/heads/master
Commit: 41c1e6887cc53d43d48c7d327ed04ee4dd1a57b4
Parents: 5b3e9c1
Author: Wes McKinney <we...@apache.org>
Authored: Mon Feb 29 16:17:16 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Mon Feb 29 16:17:16 2016 -0800
----------------------------------------------------------------------
example/parquet_reader.cc | 10 ++-
src/parquet/file/reader.cc | 10 ++-
src/parquet/file/reader.h | 3 +-
src/parquet/util/CMakeLists.txt | 2 +-
src/parquet/util/input-output-test.cc | 125 +++++++++++++++++++++++++++++
src/parquet/util/input.cc | 96 ++++++++++++++++++++--
src/parquet/util/input.h | 39 ++++++++-
src/parquet/util/output-test.cc | 46 -----------
8 files changed, 270 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 8e498c7..8acc4a7 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -24,26 +24,30 @@ using namespace parquet_cpp;
int main(int argc, char** argv) {
if (argc > 3) {
- std::cerr << "Usage: parquet_reader [--only-stats] <file>"
+ std::cerr << "Usage: parquet_reader [--only-stats] [--no-memory-map] <file>"
<< std::endl;
return -1;
}
std::string filename;
bool print_values = true;
+ bool memory_map = true;
// Read command-line options
char *param, *value;
for (int i = 1; i < argc; i++) {
- if ( (param = std::strstr(argv[i], "--only-stats")) ) {
+ if ((param = std::strstr(argv[i], "--only-stats"))) {
print_values = false;
+ } else if ((param = std::strstr(argv[i], "--no-memory-map"))) {
+ memory_map = false;
} else {
filename = argv[i];
}
}
try {
- std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename);
+ std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename,
+ memory_map);
reader->DebugPrint(std::cout, print_values);
} catch (const std::exception& e) {
std::cerr << "Parquet error: "
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 4901471..fcbe453 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -67,8 +67,14 @@ RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
ParquetFileReader::ParquetFileReader() : schema_(nullptr) {}
ParquetFileReader::~ParquetFileReader() {}
-std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path) {
- std::unique_ptr<LocalFileSource> file(new LocalFileSource());
+std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path,
+ bool memory_map) {
+ std::unique_ptr<LocalFileSource> file;
+ if (memory_map) {
+ file.reset(new MemoryMapSource());
+ } else {
+ file.reset(new LocalFileSource());
+ }
file->Open(path);
auto contents = SerializedFile::Open(std::move(file));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index fcc2c18..94f5931 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -89,7 +89,8 @@ class ParquetFileReader {
~ParquetFileReader();
// API Convenience to open a serialized Parquet file on disk
- static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path);
+ static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
+ bool memory_map = true);
void Open(std::unique_ptr<Contents> contents);
void Close();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index a009129..c8d2c2f 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -63,6 +63,6 @@ endif()
ADD_PARQUET_TEST(bit-util-test)
ADD_PARQUET_TEST(buffer-test)
+ADD_PARQUET_TEST(input-output-test)
ADD_PARQUET_TEST(mem-pool-test)
-ADD_PARQUET_TEST(output-test)
ADD_PARQUET_TEST(rle-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
new file mode 100644
index 0000000..424be3a
--- /dev/null
+++ b/src/parquet/util/input-output-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <cstdio>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/exception.h"
+#include "parquet/util/buffer.h"
+#include "parquet/util/input.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet_cpp {
+
+TEST(TestInMemoryOutputStream, Basics) {
+ std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
+
+ std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+
+ stream->Write(&data[0], 4);
+ ASSERT_EQ(4, stream->Tell());
+ stream->Write(&data[4], data.size() - 4);
+
+ std::shared_ptr<Buffer> buffer = stream->GetBuffer();
+
+ Buffer data_buf(data.data(), data.size());
+
+ ASSERT_TRUE(data_buf.Equals(*buffer));
+}
+
+static bool file_exists(const std::string& path) {
+ return std::ifstream(path.c_str()).good();
+}
+
+template <typename ReaderType>
+class TestFileReaders : public ::testing::Test {
+ public:
+ void SetUp() {
+ test_path_ = "parquet-input-output-test.txt";
+ if (file_exists(test_path_)) {
+ std::remove(test_path_.c_str());
+ }
+ test_data_ = "testingdata";
+
+ std::ofstream stream;
+ stream.open(test_path_.c_str());
+ stream << test_data_;
+ filesize_ = test_data_.size();
+ }
+
+ void TearDown() {
+ DeleteTestFile();
+ }
+
+ void DeleteTestFile() {
+ if (file_exists(test_path_)) {
+ std::remove(test_path_.c_str());
+ }
+ }
+
+ protected:
+ ReaderType source;
+ std::string test_path_;
+ std::string test_data_;
+ int filesize_;
+};
+
+typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes;
+
+TYPED_TEST_CASE(TestFileReaders, ReaderTypes);
+
+TYPED_TEST(TestFileReaders, NonExistentFile) {
+ ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException);
+}
+
+TYPED_TEST(TestFileReaders, Read) {
+ this->source.Open(this->test_path_);
+
+ ASSERT_EQ(this->filesize_, this->source.Size());
+
+ std::shared_ptr<Buffer> buffer = this->source.Read(4);
+ ASSERT_EQ(4, buffer->size());
+ ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4));
+
+ // Read past EOF
+ buffer = this->source.Read(10);
+ ASSERT_EQ(7, buffer->size());
+ ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7));
+}
+
+TYPED_TEST(TestFileReaders, FileDisappeared) {
+ this->source.Open(this->test_path_);
+ this->source.Seek(4);
+ this->DeleteTestFile();
+ this->source.Close();
+}
+
+TYPED_TEST(TestFileReaders, BadSeek) {
+ this->source.Open(this->test_path_);
+
+ ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
index a238ff6..897d81c 100644
--- a/src/parquet/util/input.cc
+++ b/src/parquet/util/input.cc
@@ -17,7 +17,9 @@
#include "parquet/util/input.h"
+#include <sys/mman.h>
#include <algorithm>
+#include <sstream>
#include <string>
#include "parquet/exception.h"
@@ -42,13 +44,32 @@ LocalFileSource::~LocalFileSource() {
void LocalFileSource::Open(const std::string& path) {
path_ = path;
- file_ = fopen(path_.c_str(), "r");
+ file_ = fopen(path_.c_str(), "rb");
+ if (file_ == nullptr || ferror(file_)) {
+ std::stringstream ss;
+ ss << "Unable to open file: " << path;
+ throw ParquetException(ss.str());
+ }
is_open_ = true;
- fseek(file_, 0L, SEEK_END);
- size_ = Tell();
+ SeekFile(0, SEEK_END);
+ size_ = LocalFileSource::Tell();
Seek(0);
}
+void LocalFileSource::SeekFile(int64_t pos, int origin) {
+ if (origin == SEEK_SET && (pos < 0 || pos >= size_)) {
+ std::stringstream ss;
+ ss << "Position " << pos << " is not in range.";
+ throw ParquetException(ss.str());
+ }
+
+ if (0 != fseek(file_, pos, origin)) {
+ std::stringstream ss;
+ ss << "File seek to position " << pos << " failed.";
+ throw ParquetException(ss.str());
+ }
+}
+
void LocalFileSource::Close() {
// Pure virtual
CloseFile();
@@ -62,7 +83,7 @@ void LocalFileSource::CloseFile() {
}
void LocalFileSource::Seek(int64_t pos) {
- fseek(file_, pos, SEEK_SET);
+ SeekFile(pos);
}
int64_t LocalFileSource::Size() const {
@@ -70,7 +91,15 @@ int64_t LocalFileSource::Size() const {
}
int64_t LocalFileSource::Tell() const {
- return ftell(file_);
+ int64_t position = ftell(file_);
+ if (position < 0) {
+ throw ParquetException("ftell failed, did the file disappear?");
+ }
+ return position;
+}
+
+int LocalFileSource::file_descriptor() const {
+ return fileno(file_);
}
int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
@@ -87,6 +116,63 @@ std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
}
return result;
}
+// ----------------------------------------------------------------------
+// MemoryMapSource methods
+
+MemoryMapSource::~MemoryMapSource() {
+ CloseFile();
+}
+
+void MemoryMapSource::Open(const std::string& path) {
+ LocalFileSource::Open(path);
+ data_ = reinterpret_cast<uint8_t*>(mmap(nullptr, size_, PROT_READ,
+ MAP_SHARED, fileno(file_), 0));
+ if (data_ == nullptr) {
+ throw ParquetException("Memory mapping file failed");
+ }
+ pos_ = 0;
+}
+
+void MemoryMapSource::Close() {
+ // Pure virtual
+ CloseFile();
+}
+
+void MemoryMapSource::CloseFile() {
+ if (data_ != nullptr) {
+ munmap(data_, size_);
+ }
+
+ LocalFileSource::CloseFile();
+}
+
+void MemoryMapSource::Seek(int64_t pos) {
+ if (pos < 0 || pos >= size_) {
+ std::stringstream ss;
+ ss << "Position " << pos << " is not in range.";
+ throw ParquetException(ss.str());
+ }
+
+ pos_ = pos;
+}
+
+int64_t MemoryMapSource::Tell() const {
+ return pos_;
+}
+
+int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) {
+ int64_t bytes_available = std::min(nbytes, size_ - pos_);
+ memcpy(buffer, data_ + pos_, bytes_available);
+ pos_ += bytes_available;
+ return bytes_available;
+}
+
+std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) {
+ int64_t bytes_available = std::min(nbytes, size_ - pos_);
+ auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available);
+ pos_ += bytes_available;
+ return result;
+}
// ----------------------------------------------------------------------
// BufferReader
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
index 5f2bde3..80fb730 100644
--- a/src/parquet/util/input.h
+++ b/src/parquet/util/input.h
@@ -18,8 +18,8 @@
#ifndef PARQUET_UTIL_INPUT_H
#define PARQUET_UTIL_INPUT_H
-#include <stdio.h>
#include <cstdint>
+#include <cstdio>
#include <memory>
#include <string>
#include <vector>
@@ -58,7 +58,7 @@ class LocalFileSource : public RandomAccessSource {
LocalFileSource() : file_(nullptr), is_open_(false) {}
virtual ~LocalFileSource();
- void Open(const std::string& path);
+ virtual void Open(const std::string& path);
virtual void Close();
virtual int64_t Size() const;
@@ -73,14 +73,47 @@ class LocalFileSource : public RandomAccessSource {
bool is_open() const { return is_open_;}
const std::string& path() const { return path_;}
- private:
+ // Return the integer file descriptor
+ int file_descriptor() const;
+
+ protected:
void CloseFile();
+ void SeekFile(int64_t pos, int origin = SEEK_SET);
std::string path_;
FILE* file_;
bool is_open_;
};
+class MemoryMapSource : public LocalFileSource {
+ public:
+ MemoryMapSource() :
+ LocalFileSource(),
+ data_(nullptr),
+ pos_(0) {}
+
+ virtual ~MemoryMapSource();
+
+ virtual void Close();
+ virtual void Open(const std::string& path);
+
+ virtual int64_t Tell() const;
+ virtual void Seek(int64_t pos);
+
+ // Copy data from memory map into out (must be already allocated memory)
+ // @returns: actual number of bytes read
+ virtual int64_t Read(int64_t nbytes, uint8_t* out);
+
+ // Return a buffer referencing memory-map (no copy)
+ virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
+
+ private:
+ void CloseFile();
+
+ uint8_t* data_;
+ int64_t pos_;
+};
+
// ----------------------------------------------------------------------
// A file-like object that reads from virtual address space
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc
deleted file mode 100644
index bae184a..0000000
--- a/src/parquet/util/output-test.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
-#include "parquet/util/test-common.h"
-
-namespace parquet_cpp {
-
-TEST(TestInMemoryOutputStream, Basics) {
- std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
-
- std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
-
- stream->Write(&data[0], 4);
- ASSERT_EQ(4, stream->Tell());
- stream->Write(&data[4], data.size() - 4);
-
- std::shared_ptr<Buffer> buffer = stream->GetBuffer();
-
- Buffer data_buf(data.data(), data.size());
-
- ASSERT_TRUE(data_buf.Equals(*buffer));
-}
-
-} // namespace parquet_cpp