You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 11:43:12 UTC
arrow git commit: ARROW-494: [C++] Extend lifetime of memory mapped
data if any buffers reference it
Repository: arrow
Updated Branches:
refs/heads/master 53a478dfb -> 69cdbd8ce
ARROW-494: [C++] Extend lifetime of memory mapped data if any buffers reference it
If you read memory in an IPC scenario and then the `arrow::io::MemoryMappedFile` goes out of scope, before this patch the memory was being unmapped even if there are `arrow::Buffer` object referencing it.
Author: Wes McKinney <we...@twosigma.com>
Closes #298 from wesm/ARROW-494 and squashes the following commits:
60222e3 [Wes McKinney] clang-format
2960d17 [Wes McKinney] Add C++ unit test
d7d776a [Wes McKinney] Add Python unit test where memory mapped file is garbage collected
edf1295 [Wes McKinney] Reimplement memory map owner as Buffer subclass so that MemoryMappedFile can be safely destructed without invalidating Buffers referencing the mapped data
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/69cdbd8c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/69cdbd8c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/69cdbd8c
Branch: refs/heads/master
Commit: 69cdbd8ce665138ce35bb34d0bbe8087c0e9513e
Parents: 53a478d
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 06:43:05 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 06:43:05 2017 -0500
----------------------------------------------------------------------
cpp/src/arrow/io/file.cc | 94 +++++++++++++++++++----------------
cpp/src/arrow/io/file.h | 7 ++-
cpp/src/arrow/io/io-file-test.cc | 31 ++++++++++++
python/pyarrow/tests/test_io.py | 20 +++++++-
4 files changed, 104 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 1de6efa..3bf8dfa 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -372,6 +372,8 @@ class OSFile {
int64_t size() const { return size_; }
+ FileMode::type mode() const { return mode_; }
+
protected:
std::string path_;
@@ -513,14 +515,14 @@ int FileOutputStream::file_descriptor() const {
// ----------------------------------------------------------------------
// Implement MemoryMappedFile
-class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
+class MemoryMappedFile::MemoryMap : public MutableBuffer {
public:
- MemoryMappedFileImpl() : OSFile(), data_(nullptr) {}
+ MemoryMap() : MutableBuffer(nullptr, 0) {}
- ~MemoryMappedFileImpl() {
- if (is_open_) {
- munmap(data_, size_);
- OSFile::Close();
+ ~MemoryMap() {
+ if (file_->is_open()) {
+ munmap(mutable_data_, size_);
+ file_->Close();
}
}
@@ -528,27 +530,35 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
int prot_flags;
int map_mode;
+ file_.reset(new OSFile());
+
if (mode != FileMode::READ) {
// Memory mapping has permission failures if PROT_READ not set
prot_flags = PROT_READ | PROT_WRITE;
map_mode = MAP_SHARED;
constexpr bool append = true;
constexpr bool write_only = false;
- RETURN_NOT_OK(OSFile::OpenWriteable(path, append, write_only));
- mode_ = mode;
+ RETURN_NOT_OK(file_->OpenWriteable(path, append, write_only));
+
+ is_mutable_ = true;
} else {
prot_flags = PROT_READ;
map_mode = MAP_PRIVATE; // Changes are not to be committed back to the file
- RETURN_NOT_OK(OSFile::OpenReadable(path));
+ RETURN_NOT_OK(file_->OpenReadable(path));
+
+ is_mutable_ = false;
}
- void* result = mmap(nullptr, size_, prot_flags, map_mode, fd(), 0);
+ void* result = mmap(nullptr, file_->size(), prot_flags, map_mode, file_->fd(), 0);
if (result == MAP_FAILED) {
std::stringstream ss;
ss << "Memory mapping file failed, errno: " << errno;
return Status::IOError(ss.str());
}
- data_ = reinterpret_cast<uint8_t*>(result);
+
+ data_ = mutable_data_ = reinterpret_cast<uint8_t*>(result);
+ size_ = file_->size();
+
position_ = 0;
return Status::OK();
@@ -566,50 +576,45 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
void advance(int64_t nbytes) { position_ = position_ + nbytes; }
- uint8_t* data() { return data_; }
+ uint8_t* head() { return mutable_data_ + position_; }
- uint8_t* head() { return data_ + position_; }
+ bool writable() { return file_->mode() != FileMode::READ; }
- bool writable() { return mode_ != FileMode::READ; }
+ bool opened() { return file_->is_open(); }
- bool opened() { return is_open_; }
+ int fd() const { return file_->fd(); }
private:
+ std::unique_ptr<OSFile> file_;
int64_t position_;
-
- // The memory map
- uint8_t* data_;
};
-MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
- ReadableFileInterface::set_mode(mode);
-}
-
+MemoryMappedFile::MemoryMappedFile() {}
MemoryMappedFile::~MemoryMappedFile() {}
Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out) {
- std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
+ std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
- result->impl_.reset(new MemoryMappedFileImpl());
- RETURN_NOT_OK(result->impl_->Open(path, mode));
+ result->memory_map_.reset(new MemoryMap());
+ RETURN_NOT_OK(result->memory_map_->Open(path, mode));
*out = result;
return Status::OK();
}
Status MemoryMappedFile::GetSize(int64_t* size) {
- *size = impl_->size();
+ *size = memory_map_->size();
return Status::OK();
}
Status MemoryMappedFile::Tell(int64_t* position) {
- *position = impl_->position();
+ *position = memory_map_->position();
return Status::OK();
}
Status MemoryMappedFile::Seek(int64_t position) {
- return impl_->Seek(position);
+ return memory_map_->Seek(position);
}
Status MemoryMappedFile::Close() {
@@ -618,19 +623,24 @@ Status MemoryMappedFile::Close() {
}
Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
- nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
- if (nbytes > 0) { std::memcpy(out, impl_->head(), nbytes); }
+ nbytes = std::max<int64_t>(
+ 0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+ if (nbytes > 0) { std::memcpy(out, memory_map_->head(), nbytes); }
*bytes_read = nbytes;
- impl_->advance(nbytes);
+ memory_map_->advance(nbytes);
return Status::OK();
}
Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
+ nbytes = std::max<int64_t>(
+ 0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
- const uint8_t* data = nbytes > 0 ? impl_->head() : nullptr;
- *out = std::make_shared<Buffer>(data, nbytes);
- impl_->advance(nbytes);
+ if (nbytes > 0) {
+ *out = SliceBuffer(memory_map_, memory_map_->position(), nbytes);
+ } else {
+ *out = std::make_shared<Buffer>(nullptr, 0);
+ }
+ memory_map_->advance(nbytes);
return Status::OK();
}
@@ -639,19 +649,19 @@ bool MemoryMappedFile::supports_zero_copy() const {
}
Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
- if (!impl_->opened() || !impl_->writable()) {
+ if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
}
- RETURN_NOT_OK(impl_->Seek(position));
+ RETURN_NOT_OK(memory_map_->Seek(position));
return WriteInternal(data, nbytes);
}
Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
- if (!impl_->opened() || !impl_->writable()) {
+ if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
}
- if (nbytes + impl_->position() > impl_->size()) {
+ if (nbytes + memory_map_->position() > memory_map_->size()) {
return Status::Invalid("Cannot write past end of memory map");
}
@@ -659,13 +669,13 @@ Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
}
Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
- memcpy(impl_->head(), data, nbytes);
- impl_->advance(nbytes);
+ memcpy(memory_map_->head(), data, nbytes);
+ memory_map_->advance(nbytes);
return Status::OK();
}
int MemoryMappedFile::file_descriptor() const {
- return impl_->fd();
+ return memory_map_->fd();
}
} // namespace io
http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 2387232..930346b 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -130,13 +130,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
int file_descriptor() const;
private:
- explicit MemoryMappedFile(FileMode::type mode);
+ MemoryMappedFile();
Status WriteInternal(const uint8_t* data, int64_t nbytes);
- // Hide the internal details of this class for now
- class ARROW_NO_EXPORT MemoryMappedFileImpl;
- std::unique_ptr<MemoryMappedFileImpl> impl_;
+ class ARROW_NO_EXPORT MemoryMap;
+ std::shared_ptr<MemoryMap> memory_map_;
};
} // namespace io
http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index f18f7b6..999b296 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -396,6 +396,37 @@ TEST_F(TestMemoryMappedFile, ReadOnly) {
rommap->Close();
}
+TEST_F(TestMemoryMappedFile, RetainMemoryMapReference) {
+ // ARROW-494
+
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ std::string path = "ipc-read-only-test";
+ CreateFile(path, buffer_size);
+
+ {
+ std::shared_ptr<MemoryMappedFile> rwmmap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
+ ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
+ ASSERT_OK(rwmmap->Close());
+ }
+
+ std::shared_ptr<Buffer> out_buffer;
+
+ {
+ std::shared_ptr<MemoryMappedFile> rommap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+ ASSERT_OK(rommap->Read(buffer_size, &out_buffer));
+ ASSERT_OK(rommap->Close());
+ }
+
+ // valgrind will catch if memory is unmapped
+ ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+}
+
TEST_F(TestMemoryMappedFile, InvalidMode) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);
http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f28d44a..dfa84a2 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,6 +16,7 @@
# under the License.
from io import BytesIO
+import gc
import os
import pytest
@@ -163,9 +164,8 @@ def test_inmemory_write_after_closed():
# ----------------------------------------------------------------------
# OS files and memory maps
-@pytest.fixture(scope='session')
+@pytest.fixture
def sample_disk_data(request):
-
SIZE = 4096
arr = np.random.randint(0, 256, size=SIZE).astype('u1')
data = arr.tobytes()[:SIZE]
@@ -206,6 +206,22 @@ def test_memory_map_reader(sample_disk_data):
_check_native_file_reader(io.MemoryMappedFile, sample_disk_data)
+def test_memory_map_retain_buffer_reference(sample_disk_data):
+ path, data = sample_disk_data
+
+ cases = []
+ with io.MemoryMappedFile(path, 'rb') as f:
+ cases.append((f.read_buffer(100), data[:100]))
+ cases.append((f.read_buffer(100), data[100:200]))
+ cases.append((f.read_buffer(100), data[200:300]))
+
+ # Call gc.collect() for good measure
+ gc.collect()
+
+ for buf, expected in cases:
+ assert buf.to_pybytes() == expected
+
+
def test_os_file_reader(sample_disk_data):
_check_native_file_reader(io.OSFile, sample_disk_data)