You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 11:43:12 UTC

arrow git commit: ARROW-494: [C++] Extend lifetime of memory mapped data if any buffers reference it

Repository: arrow
Updated Branches:
  refs/heads/master 53a478dfb -> 69cdbd8ce


ARROW-494: [C++] Extend lifetime of memory mapped data if any buffers reference it

If you read memory in an IPC scenario and then the `arrow::io::MemoryMappedFile` goes out of scope, before this patch the memory was being unmapped even if there are `arrow::Buffer` object referencing it.

Author: Wes McKinney <we...@twosigma.com>

Closes #298 from wesm/ARROW-494 and squashes the following commits:

60222e3 [Wes McKinney] clang-format
2960d17 [Wes McKinney] Add C++ unit test
d7d776a [Wes McKinney] Add Python unit test where memory mapped file is garbage collected
edf1295 [Wes McKinney] Reimplement memory map owner as Buffer subclass so that MemoryMappedFile can be safely destructed without invalidating Buffers referencing the mapped data


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/69cdbd8c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/69cdbd8c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/69cdbd8c

Branch: refs/heads/master
Commit: 69cdbd8ce665138ce35bb34d0bbe8087c0e9513e
Parents: 53a478d
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 06:43:05 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 06:43:05 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/io/file.cc         | 94 +++++++++++++++++++----------------
 cpp/src/arrow/io/file.h          |  7 ++-
 cpp/src/arrow/io/io-file-test.cc | 31 ++++++++++++
 python/pyarrow/tests/test_io.py  | 20 +++++++-
 4 files changed, 104 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 1de6efa..3bf8dfa 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -372,6 +372,8 @@ class OSFile {
 
   int64_t size() const { return size_; }
 
+  FileMode::type mode() const { return mode_; }
+
  protected:
   std::string path_;
 
@@ -513,14 +515,14 @@ int FileOutputStream::file_descriptor() const {
 // ----------------------------------------------------------------------
 // Implement MemoryMappedFile
 
-class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
+class MemoryMappedFile::MemoryMap : public MutableBuffer {
  public:
-  MemoryMappedFileImpl() : OSFile(), data_(nullptr) {}
+  MemoryMap() : MutableBuffer(nullptr, 0) {}
 
-  ~MemoryMappedFileImpl() {
-    if (is_open_) {
-      munmap(data_, size_);
-      OSFile::Close();
+  ~MemoryMap() {
+    if (file_->is_open()) {
+      munmap(mutable_data_, size_);
+      file_->Close();
     }
   }
 
@@ -528,27 +530,35 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
     int prot_flags;
     int map_mode;
 
+    file_.reset(new OSFile());
+
     if (mode != FileMode::READ) {
       // Memory mapping has permission failures if PROT_READ not set
       prot_flags = PROT_READ | PROT_WRITE;
       map_mode = MAP_SHARED;
       constexpr bool append = true;
       constexpr bool write_only = false;
-      RETURN_NOT_OK(OSFile::OpenWriteable(path, append, write_only));
-      mode_ = mode;
+      RETURN_NOT_OK(file_->OpenWriteable(path, append, write_only));
+
+      is_mutable_ = true;
     } else {
       prot_flags = PROT_READ;
       map_mode = MAP_PRIVATE;  // Changes are not to be committed back to the file
-      RETURN_NOT_OK(OSFile::OpenReadable(path));
+      RETURN_NOT_OK(file_->OpenReadable(path));
+
+      is_mutable_ = false;
     }
 
-    void* result = mmap(nullptr, size_, prot_flags, map_mode, fd(), 0);
+    void* result = mmap(nullptr, file_->size(), prot_flags, map_mode, file_->fd(), 0);
     if (result == MAP_FAILED) {
       std::stringstream ss;
       ss << "Memory mapping file failed, errno: " << errno;
       return Status::IOError(ss.str());
     }
-    data_ = reinterpret_cast<uint8_t*>(result);
+
+    data_ = mutable_data_ = reinterpret_cast<uint8_t*>(result);
+    size_ = file_->size();
+
     position_ = 0;
 
     return Status::OK();
@@ -566,50 +576,45 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
 
   void advance(int64_t nbytes) { position_ = position_ + nbytes; }
 
-  uint8_t* data() { return data_; }
+  uint8_t* head() { return mutable_data_ + position_; }
 
-  uint8_t* head() { return data_ + position_; }
+  bool writable() { return file_->mode() != FileMode::READ; }
 
-  bool writable() { return mode_ != FileMode::READ; }
+  bool opened() { return file_->is_open(); }
 
-  bool opened() { return is_open_; }
+  int fd() const { return file_->fd(); }
 
  private:
+  std::unique_ptr<OSFile> file_;
   int64_t position_;
-
-  // The memory map
-  uint8_t* data_;
 };
 
-MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
-  ReadableFileInterface::set_mode(mode);
-}
-
+MemoryMappedFile::MemoryMappedFile() {}
 MemoryMappedFile::~MemoryMappedFile() {}
 
 Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
     std::shared_ptr<MemoryMappedFile>* out) {
-  std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
+  std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
 
-  result->impl_.reset(new MemoryMappedFileImpl());
-  RETURN_NOT_OK(result->impl_->Open(path, mode));
+  result->memory_map_.reset(new MemoryMap());
+  RETURN_NOT_OK(result->memory_map_->Open(path, mode));
 
   *out = result;
   return Status::OK();
 }
 
 Status MemoryMappedFile::GetSize(int64_t* size) {
-  *size = impl_->size();
+  *size = memory_map_->size();
   return Status::OK();
 }
 
 Status MemoryMappedFile::Tell(int64_t* position) {
-  *position = impl_->position();
+  *position = memory_map_->position();
   return Status::OK();
 }
 
 Status MemoryMappedFile::Seek(int64_t position) {
-  return impl_->Seek(position);
+  return memory_map_->Seek(position);
 }
 
 Status MemoryMappedFile::Close() {
@@ -618,19 +623,24 @@ Status MemoryMappedFile::Close() {
 }
 
 Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
-  nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
-  if (nbytes > 0) { std::memcpy(out, impl_->head(), nbytes); }
+  nbytes = std::max<int64_t>(
+      0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+  if (nbytes > 0) { std::memcpy(out, memory_map_->head(), nbytes); }
   *bytes_read = nbytes;
-  impl_->advance(nbytes);
+  memory_map_->advance(nbytes);
   return Status::OK();
 }
 
 Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
+  nbytes = std::max<int64_t>(
+      0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
 
-  const uint8_t* data = nbytes > 0 ? impl_->head() : nullptr;
-  *out = std::make_shared<Buffer>(data, nbytes);
-  impl_->advance(nbytes);
+  if (nbytes > 0) {
+    *out = SliceBuffer(memory_map_, memory_map_->position(), nbytes);
+  } else {
+    *out = std::make_shared<Buffer>(nullptr, 0);
+  }
+  memory_map_->advance(nbytes);
   return Status::OK();
 }
 
@@ -639,19 +649,19 @@ bool MemoryMappedFile::supports_zero_copy() const {
 }
 
 Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
-  if (!impl_->opened() || !impl_->writable()) {
+  if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
   }
 
-  RETURN_NOT_OK(impl_->Seek(position));
+  RETURN_NOT_OK(memory_map_->Seek(position));
   return WriteInternal(data, nbytes);
 }
 
 Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
-  if (!impl_->opened() || !impl_->writable()) {
+  if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
   }
-  if (nbytes + impl_->position() > impl_->size()) {
+  if (nbytes + memory_map_->position() > memory_map_->size()) {
     return Status::Invalid("Cannot write past end of memory map");
   }
 
@@ -659,13 +669,13 @@ Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
 }
 
 Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
-  memcpy(impl_->head(), data, nbytes);
-  impl_->advance(nbytes);
+  memcpy(memory_map_->head(), data, nbytes);
+  memory_map_->advance(nbytes);
   return Status::OK();
 }
 
 int MemoryMappedFile::file_descriptor() const {
-  return impl_->fd();
+  return memory_map_->fd();
 }
 
 }  // namespace io

http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 2387232..930346b 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -130,13 +130,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
   int file_descriptor() const;
 
  private:
-  explicit MemoryMappedFile(FileMode::type mode);
+  MemoryMappedFile();
 
   Status WriteInternal(const uint8_t* data, int64_t nbytes);
 
-  // Hide the internal details of this class for now
-  class ARROW_NO_EXPORT MemoryMappedFileImpl;
-  std::unique_ptr<MemoryMappedFileImpl> impl_;
+  class ARROW_NO_EXPORT MemoryMap;
+  std::shared_ptr<MemoryMap> memory_map_;
 };
 
 }  // namespace io

http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index f18f7b6..999b296 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -396,6 +396,37 @@ TEST_F(TestMemoryMappedFile, ReadOnly) {
   rommap->Close();
 }
 
+TEST_F(TestMemoryMappedFile, RetainMemoryMapReference) {
+  // ARROW-494
+
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+
+  test::random_bytes(1024, 0, buffer.data());
+
+  std::string path = "ipc-read-only-test";
+  CreateFile(path, buffer_size);
+
+  {
+    std::shared_ptr<MemoryMappedFile> rwmmap;
+    ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
+    ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
+    ASSERT_OK(rwmmap->Close());
+  }
+
+  std::shared_ptr<Buffer> out_buffer;
+
+  {
+    std::shared_ptr<MemoryMappedFile> rommap;
+    ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+    ASSERT_OK(rommap->Read(buffer_size, &out_buffer));
+    ASSERT_OK(rommap->Close());
+  }
+
+  // valgrind will catch if memory is unmapped
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+}
+
 TEST_F(TestMemoryMappedFile, InvalidMode) {
   const int64_t buffer_size = 1024;
   std::vector<uint8_t> buffer(buffer_size);

http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f28d44a..dfa84a2 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,6 +16,7 @@
 # under the License.
 
 from io import BytesIO
+import gc
 import os
 import pytest
 
@@ -163,9 +164,8 @@ def test_inmemory_write_after_closed():
 # ----------------------------------------------------------------------
 # OS files and memory maps
 
-@pytest.fixture(scope='session')
+@pytest.fixture
 def sample_disk_data(request):
-
     SIZE = 4096
     arr = np.random.randint(0, 256, size=SIZE).astype('u1')
     data = arr.tobytes()[:SIZE]
@@ -206,6 +206,22 @@ def test_memory_map_reader(sample_disk_data):
     _check_native_file_reader(io.MemoryMappedFile, sample_disk_data)
 
 
+def test_memory_map_retain_buffer_reference(sample_disk_data):
+    path, data = sample_disk_data
+
+    cases = []
+    with io.MemoryMappedFile(path, 'rb') as f:
+        cases.append((f.read_buffer(100), data[:100]))
+        cases.append((f.read_buffer(100), data[100:200]))
+        cases.append((f.read_buffer(100), data[200:300]))
+
+    # Call gc.collect() for good measure
+    gc.collect()
+
+    for buf, expected in cases:
+        assert buf.to_pybytes() == expected
+
+
 def test_os_file_reader(sample_disk_data):
     _check_native_file_reader(io.OSFile, sample_disk_data)