You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/03/01 01:17:19 UTC

parquet-cpp git commit: PARQUET-520: Add MemoryMapSource and add unit tests for both it and LocalFileSource

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 5b3e9c103 -> 41c1e6887


PARQUET-520: Add MemoryMapSource and add unit tests for both it and LocalFileSource

I also added the `file_descriptor` API so that we can verify that dtors elsewhere successfully close open files. Closes #56

Author: Wes McKinney <we...@apache.org>

Closes #66 from wesm/PARQUET-520 and squashes the following commits:

9d638ba [Wes McKinney] Add memory-mapping option to ParquetFileReader::OpenFile. Add --no-memory-map flag to parquet_reader
6389683 [Wes McKinney] Add Read API tests
dbf6a45 [Wes McKinney] Test some failure modes for LocalFileSource / MemoryMapSource
01a7d64 [Wes McKinney] Add a MemoryMapSource and use this by default for SerializedFileReader


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/41c1e688
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/41c1e688
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/41c1e688

Branch: refs/heads/master
Commit: 41c1e6887cc53d43d48c7d327ed04ee4dd1a57b4
Parents: 5b3e9c1
Author: Wes McKinney <we...@apache.org>
Authored: Mon Feb 29 16:17:16 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Mon Feb 29 16:17:16 2016 -0800

----------------------------------------------------------------------
 example/parquet_reader.cc             |  10 ++-
 src/parquet/file/reader.cc            |  10 ++-
 src/parquet/file/reader.h             |   3 +-
 src/parquet/util/CMakeLists.txt       |   2 +-
 src/parquet/util/input-output-test.cc | 125 +++++++++++++++++++++++++++++
 src/parquet/util/input.cc             |  96 ++++++++++++++++++++--
 src/parquet/util/input.h              |  39 ++++++++-
 src/parquet/util/output-test.cc       |  46 -----------
 8 files changed, 270 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 8e498c7..8acc4a7 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -24,26 +24,30 @@ using namespace parquet_cpp;
 
 int main(int argc, char** argv) {
   if (argc > 3) {
-    std::cerr << "Usage: parquet_reader [--only-stats] <file>"
+    std::cerr << "Usage: parquet_reader [--only-stats] [--no-memory-map] <file>"
               << std::endl;
     return -1;
   }
 
   std::string filename;
   bool print_values = true;
+  bool memory_map = true;
 
   // Read command-line options
   char *param, *value;
   for (int i = 1; i < argc; i++) {
-    if ( (param = std::strstr(argv[i], "--only-stats")) ) {
+    if ((param = std::strstr(argv[i], "--only-stats"))) {
       print_values = false;
+    } else if ((param = std::strstr(argv[i], "--no-memory-map"))) {
+      memory_map = false;
     } else {
       filename = argv[i];
     }
   }
 
   try {
-    std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename);
+    std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename,
+        memory_map);
     reader->DebugPrint(std::cout, print_values);
   } catch (const std::exception& e) {
     std::cerr << "Parquet error: "

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 4901471..fcbe453 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -67,8 +67,14 @@ RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
 ParquetFileReader::ParquetFileReader() : schema_(nullptr) {}
 ParquetFileReader::~ParquetFileReader() {}
 
-std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path) {
-  std::unique_ptr<LocalFileSource> file(new LocalFileSource());
+std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path,
+    bool memory_map) {
+  std::unique_ptr<LocalFileSource> file;
+  if (memory_map) {
+    file.reset(new MemoryMapSource());
+  } else {
+    file.reset(new LocalFileSource());
+  }
   file->Open(path);
 
   auto contents = SerializedFile::Open(std::move(file));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index fcc2c18..94f5931 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -89,7 +89,8 @@ class ParquetFileReader {
   ~ParquetFileReader();
 
   // API Convenience to open a serialized Parquet file on disk
-  static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path);
+  static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
+      bool memory_map = true);
 
   void Open(std::unique_ptr<Contents> contents);
   void Close();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index a009129..c8d2c2f 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -63,6 +63,6 @@ endif()
 
 ADD_PARQUET_TEST(bit-util-test)
 ADD_PARQUET_TEST(buffer-test)
+ADD_PARQUET_TEST(input-output-test)
 ADD_PARQUET_TEST(mem-pool-test)
-ADD_PARQUET_TEST(output-test)
 ADD_PARQUET_TEST(rle-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
new file mode 100644
index 0000000..424be3a
--- /dev/null
+++ b/src/parquet/util/input-output-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <cstdio>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/exception.h"
+#include "parquet/util/buffer.h"
+#include "parquet/util/input.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet_cpp {
+
+TEST(TestInMemoryOutputStream, Basics) {
+  std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
+
+  std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+
+  stream->Write(&data[0], 4);
+  ASSERT_EQ(4, stream->Tell());
+  stream->Write(&data[4], data.size() - 4);
+
+  std::shared_ptr<Buffer> buffer = stream->GetBuffer();
+
+  Buffer data_buf(data.data(), data.size());
+
+  ASSERT_TRUE(data_buf.Equals(*buffer));
+}
+
+static bool file_exists(const std::string& path) {
+  return std::ifstream(path.c_str()).good();
+}
+
+template <typename ReaderType>
+class TestFileReaders : public ::testing::Test {
+ public:
+  void SetUp() {
+    test_path_ = "parquet-input-output-test.txt";
+    if (file_exists(test_path_)) {
+      std::remove(test_path_.c_str());
+    }
+    test_data_ = "testingdata";
+
+    std::ofstream stream;
+    stream.open(test_path_.c_str());
+    stream << test_data_;
+    filesize_ = test_data_.size();
+  }
+
+  void TearDown() {
+    DeleteTestFile();
+  }
+
+  void DeleteTestFile() {
+    if (file_exists(test_path_)) {
+      std::remove(test_path_.c_str());
+    }
+  }
+
+ protected:
+  ReaderType source;
+  std::string test_path_;
+  std::string test_data_;
+  int filesize_;
+};
+
+typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes;
+
+TYPED_TEST_CASE(TestFileReaders, ReaderTypes);
+
+TYPED_TEST(TestFileReaders, NonExistentFile) {
+  ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException);
+}
+
+TYPED_TEST(TestFileReaders, Read) {
+  this->source.Open(this->test_path_);
+
+  ASSERT_EQ(this->filesize_, this->source.Size());
+
+  std::shared_ptr<Buffer> buffer = this->source.Read(4);
+  ASSERT_EQ(4, buffer->size());
+  ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4));
+
+  // Read past EOF
+  buffer = this->source.Read(10);
+  ASSERT_EQ(7, buffer->size());
+  ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7));
+}
+
+TYPED_TEST(TestFileReaders, FileDisappeared) {
+  this->source.Open(this->test_path_);
+  this->source.Seek(4);
+  this->DeleteTestFile();
+  this->source.Close();
+}
+
+TYPED_TEST(TestFileReaders, BadSeek) {
+  this->source.Open(this->test_path_);
+
+  ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
index a238ff6..897d81c 100644
--- a/src/parquet/util/input.cc
+++ b/src/parquet/util/input.cc
@@ -17,7 +17,9 @@
 
 #include "parquet/util/input.h"
 
+#include <sys/mman.h>
 #include <algorithm>
+#include <sstream>
 #include <string>
 
 #include "parquet/exception.h"
@@ -42,13 +44,32 @@ LocalFileSource::~LocalFileSource() {
 
 void LocalFileSource::Open(const std::string& path) {
   path_ = path;
-  file_ = fopen(path_.c_str(), "r");
+  file_ = fopen(path_.c_str(), "rb");
+  if (file_ == nullptr || ferror(file_)) {
+    std::stringstream ss;
+    ss << "Unable to open file: " << path;
+    throw ParquetException(ss.str());
+  }
   is_open_ = true;
-  fseek(file_, 0L, SEEK_END);
-  size_ = Tell();
+  SeekFile(0, SEEK_END);
+  size_ = LocalFileSource::Tell();
   Seek(0);
 }
 
+void LocalFileSource::SeekFile(int64_t pos, int origin) {
+  if (origin == SEEK_SET && (pos < 0 || pos >= size_)) {
+    std::stringstream ss;
+    ss << "Position " << pos << " is not in range.";
+    throw ParquetException(ss.str());
+  }
+
+  if (0 != fseek(file_, pos, origin)) {
+    std::stringstream ss;
+    ss << "File seek to position " << pos << " failed.";
+    throw ParquetException(ss.str());
+  }
+}
+
 void LocalFileSource::Close() {
   // Pure virtual
   CloseFile();
@@ -62,7 +83,7 @@ void LocalFileSource::CloseFile() {
 }
 
 void LocalFileSource::Seek(int64_t pos) {
-  fseek(file_, pos, SEEK_SET);
+  SeekFile(pos);
 }
 
 int64_t LocalFileSource::Size() const {
@@ -70,7 +91,15 @@ int64_t LocalFileSource::Size() const {
 }
 
 int64_t LocalFileSource::Tell() const {
-  return ftell(file_);
+  int64_t position = ftell(file_);
+  if (position < 0) {
+    throw ParquetException("ftell failed, did the file disappear?");
+  }
+  return position;
+}
+
+int LocalFileSource::file_descriptor() const {
+  return fileno(file_);
 }
 
 int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
@@ -87,6 +116,63 @@ std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
   }
   return result;
 }
+// ----------------------------------------------------------------------
+// MemoryMapSource methods
+
+MemoryMapSource::~MemoryMapSource() {
+  CloseFile();
+}
+
+void MemoryMapSource::Open(const std::string& path) {
+  LocalFileSource::Open(path);
+  data_ = reinterpret_cast<uint8_t*>(mmap(nullptr, size_, PROT_READ,
+          MAP_SHARED, fileno(file_), 0));
+  if (data_ == nullptr) {
+    throw ParquetException("Memory mapping file failed");
+  }
+  pos_  = 0;
+}
+
+void MemoryMapSource::Close() {
+  // Pure virtual
+  CloseFile();
+}
+
+void MemoryMapSource::CloseFile() {
+  if (data_ != nullptr) {
+    munmap(data_, size_);
+  }
+
+  LocalFileSource::CloseFile();
+}
+
+void MemoryMapSource::Seek(int64_t pos) {
+  if (pos < 0 || pos >= size_) {
+    std::stringstream ss;
+    ss << "Position " << pos << " is not in range.";
+    throw ParquetException(ss.str());
+  }
+
+  pos_ = pos;
+}
+
+int64_t MemoryMapSource::Tell() const {
+  return pos_;
+}
+
+int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) {
+  int64_t bytes_available = std::min(nbytes, size_ - pos_);
+  memcpy(buffer, data_ + pos_, bytes_available);
+  pos_ += bytes_available;
+  return bytes_available;
+}
+
+std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) {
+  int64_t bytes_available = std::min(nbytes, size_ - pos_);
+  auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available);
+  pos_ += bytes_available;
+  return result;
+}
 
 // ----------------------------------------------------------------------
 // BufferReader

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
index 5f2bde3..80fb730 100644
--- a/src/parquet/util/input.h
+++ b/src/parquet/util/input.h
@@ -18,8 +18,8 @@
 #ifndef PARQUET_UTIL_INPUT_H
 #define PARQUET_UTIL_INPUT_H
 
-#include <stdio.h>
 #include <cstdint>
+#include <cstdio>
 #include <memory>
 #include <string>
 #include <vector>
@@ -58,7 +58,7 @@ class LocalFileSource : public RandomAccessSource {
   LocalFileSource() : file_(nullptr), is_open_(false) {}
   virtual ~LocalFileSource();
 
-  void Open(const std::string& path);
+  virtual void Open(const std::string& path);
 
   virtual void Close();
   virtual int64_t Size() const;
@@ -73,14 +73,47 @@ class LocalFileSource : public RandomAccessSource {
   bool is_open() const { return is_open_;}
   const std::string& path() const { return path_;}
 
- private:
+  // Return the integer file descriptor
+  int file_descriptor() const;
+
+ protected:
   void CloseFile();
+  void SeekFile(int64_t pos, int origin = SEEK_SET);
 
   std::string path_;
   FILE* file_;
   bool is_open_;
 };
 
+class MemoryMapSource : public LocalFileSource {
+ public:
+  MemoryMapSource() :
+      LocalFileSource(),
+      data_(nullptr),
+      pos_(0) {}
+
+  virtual ~MemoryMapSource();
+
+  virtual void Close();
+  virtual void Open(const std::string& path);
+
+  virtual int64_t Tell() const;
+  virtual void Seek(int64_t pos);
+
+  // Copy data from memory map into out (must be already allocated memory)
+  // @returns: actual number of bytes read
+  virtual int64_t Read(int64_t nbytes, uint8_t* out);
+
+  // Return a buffer referencing memory-map (no copy)
+  virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
+
+ private:
+  void CloseFile();
+
+  uint8_t* data_;
+  int64_t pos_;
+};
+
 // ----------------------------------------------------------------------
 // A file-like object that reads from virtual address space
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc
deleted file mode 100644
index bae184a..0000000
--- a/src/parquet/util/output-test.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
-#include "parquet/util/test-common.h"
-
-namespace parquet_cpp {
-
-TEST(TestInMemoryOutputStream, Basics) {
-  std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
-
-  std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
-
-  stream->Write(&data[0], 4);
-  ASSERT_EQ(4, stream->Tell());
-  stream->Write(&data[4], data.size() - 4);
-
-  std::shared_ptr<Buffer> buffer = stream->GetBuffer();
-
-  Buffer data_buf(data.data(), data.size());
-
-  ASSERT_TRUE(data_buf.Equals(*buffer));
-}
-
-} // namespace parquet_cpp