You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/01/03 07:29:02 UTC
arrow git commit: ARROW-294: [C++] Do not use platform-dependent
fopen/fclose functions for MemoryMappedFile
Repository: arrow
Updated Branches:
refs/heads/master 9f7d4ae6d -> d9df55679
ARROW-294: [C++] Do not use platform-dependent fopen/fclose functions for MemoryMappedFile
Also adds a test case for ARROW-340.
Author: Wes McKinney <we...@twosigma.com>
Closes #265 from wesm/ARROW-294 and squashes the following commits:
42a83a4 [Wes McKinney] Remove duplicated includes
3928ab0 [Wes McKinney] Base MemoryMappedFile implementation on common OSFile interface. Add test case for ARROW-340.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d9df5567
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d9df5567
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d9df5567
Branch: refs/heads/master
Commit: d9df556791fc6051b2c8582668df9c256f675116
Parents: 9f7d4ae
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Jan 3 08:28:46 2017 +0100
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Tue Jan 3 08:28:46 2017 +0100
----------------------------------------------------------------------
cpp/src/arrow/io/file.cc | 208 +++++++++++++++++++++++++++++---
cpp/src/arrow/io/file.h | 49 ++++++++
cpp/src/arrow/io/io-file-test.cc | 116 +++++++++++++++++-
cpp/src/arrow/io/io-memory-test.cc | 91 --------------
cpp/src/arrow/io/memory.cc | 178 ---------------------------
cpp/src/arrow/io/memory.h | 39 ------
cpp/src/arrow/io/test-common.h | 1 +
7 files changed, 359 insertions(+), 323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index c50a9bb..3182f2d 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -60,7 +60,7 @@
#endif // _MSC_VER
-// defines that
+// defines that don't exist in MinGW
#if defined(__MINGW32__)
#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
#elif defined(_MSC_VER) // Visual Studio
@@ -174,7 +174,8 @@ static inline Status FileOpenReadable(const std::string& filename, int* fd) {
return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
}
-static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
+static inline Status FileOpenWriteable(
+ const std::string& filename, bool write_only, bool truncate, int* fd) {
int ret;
errno_t errno_actual = 0;
@@ -186,13 +187,31 @@ static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
memcpy(wpath.data(), filename.data(), filename.size());
memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));
- errno_actual = _wsopen_s(fd, wpath.data(), _O_WRONLY | _O_CREAT | _O_BINARY | _O_TRUNC,
- _SH_DENYNO, _S_IWRITE);
+ int oflag = _O_CREAT | _O_BINARY;
+
+ if (truncate) { oflag |= _O_TRUNC; }
+
+ if (write_only) {
+ oflag |= _O_WRONLY;
+ } else {
+ oflag |= _O_RDWR;
+ }
+
+ errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, _S_IWRITE);
ret = *fd;
#else
- ret = *fd =
- open(filename.c_str(), O_WRONLY | O_CREAT | O_BINARY | O_TRUNC, ARROW_WRITE_SHMODE);
+ int oflag = O_CREAT | O_BINARY;
+
+ if (truncate) { oflag |= O_TRUNC; }
+
+ if (write_only) {
+ oflag |= O_WRONLY;
+ } else {
+ oflag |= O_RDWR;
+ }
+
+ ret = *fd = open(filename.c_str(), oflag, ARROW_WRITE_SHMODE);
#endif
return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
}
@@ -296,10 +315,17 @@ class OSFile {
~OSFile() {}
- Status OpenWritable(const std::string& path) {
- RETURN_NOT_OK(FileOpenWriteable(path, &fd_));
+ Status OpenWriteable(const std::string& path, bool append, bool write_only) {
+ RETURN_NOT_OK(FileOpenWriteable(path, write_only, !append, &fd_));
path_ = path;
is_open_ = true;
+ mode_ = write_only ? FileMode::READ : FileMode::READWRITE;
+
+ if (append) {
+ RETURN_NOT_OK(FileGetSize(fd_, &size_));
+ } else {
+ size_ = 0;
+ }
return Status::OK();
}
@@ -307,11 +333,9 @@ class OSFile {
RETURN_NOT_OK(FileOpenReadable(path, &fd_));
RETURN_NOT_OK(FileGetSize(fd_, &size_));
- // The position should be 0 after GetSize
- // RETURN_NOT_OK(Seek(0));
-
path_ = path;
is_open_ = true;
+ mode_ = FileMode::READ;
return Status::OK();
}
@@ -346,12 +370,14 @@ class OSFile {
int64_t size() const { return size_; }
- private:
+ protected:
std::string path_;
// File descriptor
int fd_;
+ FileMode::type mode_;
+
bool is_open_;
int64_t size_;
};
@@ -440,7 +466,9 @@ int ReadableFile::file_descriptor() const {
class FileOutputStream::FileOutputStreamImpl : public OSFile {
public:
- Status Open(const std::string& path) { return OpenWritable(path); }
+ Status Open(const std::string& path, bool append) {
+ return OpenWriteable(path, append, true);
+ }
};
FileOutputStream::FileOutputStream() {
@@ -453,9 +481,14 @@ FileOutputStream::~FileOutputStream() {
Status FileOutputStream::Open(
const std::string& path, std::shared_ptr<FileOutputStream>* file) {
+ return Open(path, false, file);
+}
+
+Status FileOutputStream::Open(
+ const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file) {
// private ctor
*file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
- return (*file)->impl_->Open(path);
+ return (*file)->impl_->Open(path, append);
}
Status FileOutputStream::Close() {
@@ -474,5 +507,152 @@ int FileOutputStream::file_descriptor() const {
return impl_->fd();
}
+// ----------------------------------------------------------------------
+// Implement MemoryMappedFile
+
+class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
+ public:
+ MemoryMappedFileImpl() : OSFile(), data_(nullptr) {}
+
+ ~MemoryMappedFileImpl() {
+ if (is_open_) {
+ munmap(data_, size_);
+ OSFile::Close();
+ }
+ }
+
+ Status Open(const std::string& path, FileMode::type mode) {
+ int prot_flags = PROT_READ;
+
+ if (mode != FileMode::READ) {
+ prot_flags |= PROT_WRITE;
+ const bool append = true;
+ RETURN_NOT_OK(OSFile::OpenWriteable(path, append, mode == FileMode::WRITE));
+ } else {
+ RETURN_NOT_OK(OSFile::OpenReadable(path));
+ }
+
+ void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fd(), 0);
+ if (result == MAP_FAILED) {
+ std::stringstream ss;
+ ss << "Memory mapping file failed, errno: " << errno;
+ return Status::IOError(ss.str());
+ }
+ data_ = reinterpret_cast<uint8_t*>(result);
+ position_ = 0;
+
+ return Status::OK();
+ }
+
+ int64_t size() const { return size_; }
+
+ Status Seek(int64_t position) {
+ if (position < 0 || position >= size_) {
+ return Status::Invalid("position is out of bounds");
+ }
+ position_ = position;
+ return Status::OK();
+ }
+
+ int64_t position() { return position_; }
+
+ void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }
+
+ uint8_t* data() { return data_; }
+
+ uint8_t* head() { return data_ + position_; }
+
+ bool writable() { return mode_ != FileMode::READ; }
+
+ bool opened() { return is_open_; }
+
+ private:
+ int64_t position_;
+
+ // The memory map
+ uint8_t* data_;
+};
+
+MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
+ ReadableFileInterface::set_mode(mode);
+}
+
+MemoryMappedFile::~MemoryMappedFile() {}
+
+Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
+ std::shared_ptr<MemoryMappedFile>* out) {
+ std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
+
+ result->impl_.reset(new MemoryMappedFileImpl());
+ RETURN_NOT_OK(result->impl_->Open(path, mode));
+
+ *out = result;
+ return Status::OK();
+}
+
+Status MemoryMappedFile::GetSize(int64_t* size) {
+ *size = impl_->size();
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Tell(int64_t* position) {
+ *position = impl_->position();
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Seek(int64_t position) {
+ return impl_->Seek(position);
+}
+
+Status MemoryMappedFile::Close() {
+ // munmap handled in pimpl dtor
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+ nbytes = std::min(nbytes, impl_->size() - impl_->position());
+ std::memcpy(out, impl_->head(), nbytes);
+ *bytes_read = nbytes;
+ impl_->advance(nbytes);
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ nbytes = std::min(nbytes, impl_->size() - impl_->position());
+ *out = std::make_shared<Buffer>(impl_->head(), nbytes);
+ impl_->advance(nbytes);
+ return Status::OK();
+}
+
+bool MemoryMappedFile::supports_zero_copy() const {
+ return true;
+}
+
+Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
+ if (!impl_->opened() || !impl_->writable()) {
+ return Status::IOError("Unable to write");
+ }
+
+ RETURN_NOT_OK(impl_->Seek(position));
+ return WriteInternal(data, nbytes);
+}
+
+Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
+ if (!impl_->opened() || !impl_->writable()) {
+ return Status::IOError("Unable to write");
+ }
+ if (nbytes + impl_->position() > impl_->size()) {
+ return Status::Invalid("Cannot write past end of memory map");
+ }
+
+ return WriteInternal(data, nbytes);
+}
+
+Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
+ memcpy(impl_->head(), data, nbytes);
+ impl_->advance(nbytes);
+ return Status::OK();
+}
+
} // namespace io
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 10fe16e..9ca9c54 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -40,8 +40,13 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
public:
~FileOutputStream();
+ // When opening a new file, any existing file with the indicated path is
+ // truncated to 0 bytes, deleting any existing memory
static Status Open(const std::string& path, std::shared_ptr<FileOutputStream>* file);
+ static Status Open(
+ const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file);
+
// OutputStream interface
Status Close() override;
Status Tell(int64_t* position) override;
@@ -88,6 +93,50 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
std::unique_ptr<ReadableFileImpl> impl_;
};
+// A file interface that uses memory-mapped files for memory interactions,
+// supporting zero copy reads. The same class is used for both reading and
+// writing.
+//
+// If opening a file in a writeable mode, it is not truncated first as with
+// FileOutputStream
+class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
+ public:
+ ~MemoryMappedFile();
+
+ static Status Open(const std::string& path, FileMode::type mode,
+ std::shared_ptr<MemoryMappedFile>* out);
+
+ Status Close() override;
+
+ Status Tell(int64_t* position) override;
+
+ Status Seek(int64_t position) override;
+
+ // Required by ReadableFileInterface, copies memory into out
+ Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
+
+ // Zero copy read
+ Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+ bool supports_zero_copy() const override;
+
+ Status Write(const uint8_t* data, int64_t nbytes) override;
+
+ Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
+
+ // @return: the size in bytes of the memory source
+ Status GetSize(int64_t* size) override;
+
+ private:
+ explicit MemoryMappedFile(FileMode::type mode);
+
+ Status WriteInternal(const uint8_t* data, int64_t nbytes);
+
+ // Hide the internal details of this class for now
+ class ARROW_NO_EXPORT MemoryMappedFileImpl;
+ std::unique_ptr<MemoryMappedFileImpl> impl_;
+};
+
} // namespace io
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index f0ea7ec..5f5d639 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -71,7 +71,9 @@ class FileTestFixture : public ::testing::Test {
class TestFileOutputStream : public FileTestFixture {
public:
- void OpenFile() { ASSERT_OK(FileOutputStream::Open(path_, &file_)); }
+ void OpenFile(bool append = false) {
+ ASSERT_OK(FileOutputStream::Open(path_, append, &file_));
+ }
protected:
std::shared_ptr<FileOutputStream> file_;
@@ -131,6 +133,24 @@ TEST_F(TestFileOutputStream, Tell) {
ASSERT_EQ(8, position);
}
+TEST_F(TestFileOutputStream, TruncatesNewFile) {
+ ASSERT_OK(FileOutputStream::Open(path_, &file_));
+
+ const char* data = "testdata";
+ ASSERT_OK(file_->Write(reinterpret_cast<const uint8_t*>(data), strlen(data)));
+ ASSERT_OK(file_->Close());
+
+ ASSERT_OK(FileOutputStream::Open(path_, &file_));
+ ASSERT_OK(file_->Close());
+
+ std::shared_ptr<ReadableFile> rd_file;
+ ASSERT_OK(ReadableFile::Open(path_, &rd_file));
+
+ int64_t size;
+ ASSERT_OK(rd_file->GetSize(&size));
+ ASSERT_EQ(0, size);
+}
+
// ----------------------------------------------------------------------
// File input tests
@@ -293,5 +313,99 @@ TEST_F(TestReadableFile, CustomMemoryPool) {
ASSERT_EQ(2, pool.num_allocations());
}
+// ----------------------------------------------------------------------
+// Memory map tests
+
+class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
+ public:
+ void TearDown() { MemoryMapFixture::TearDown(); }
+};
+
+TEST_F(TestMemoryMappedFile, InvalidUsages) {}
+
+TEST_F(TestMemoryMappedFile, WriteRead) {
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ const int reps = 5;
+
+ std::string path = "ipc-write-read-test";
+ CreateFile(path, reps * buffer_size);
+
+ std::shared_ptr<MemoryMappedFile> result;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result));
+
+ int64_t position = 0;
+ std::shared_ptr<Buffer> out_buffer;
+ for (int i = 0; i < reps; ++i) {
+ ASSERT_OK(result->Write(buffer.data(), buffer_size));
+ ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
+
+ ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+
+ position += buffer_size;
+ }
+}
+
+TEST_F(TestMemoryMappedFile, ReadOnly) {
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ const int reps = 5;
+
+ std::string path = "ipc-read-only-test";
+ CreateFile(path, reps * buffer_size);
+
+ std::shared_ptr<MemoryMappedFile> rwmmap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
+
+ int64_t position = 0;
+ for (int i = 0; i < reps; ++i) {
+ ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
+ position += buffer_size;
+ }
+ rwmmap->Close();
+
+ std::shared_ptr<MemoryMappedFile> rommap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+
+ position = 0;
+ std::shared_ptr<Buffer> out_buffer;
+ for (int i = 0; i < reps; ++i) {
+ ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
+
+ ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+ position += buffer_size;
+ }
+ rommap->Close();
+}
+
+TEST_F(TestMemoryMappedFile, InvalidMode) {
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ std::string path = "ipc-invalid-mode-test";
+ CreateFile(path, buffer_size);
+
+ std::shared_ptr<MemoryMappedFile> rommap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+
+ ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size));
+}
+
+TEST_F(TestMemoryMappedFile, InvalidFile) {
+ std::string non_existent_path = "invalid-file-name-asfd";
+
+ std::shared_ptr<MemoryMappedFile> result;
+ ASSERT_RAISES(
+ IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
+}
+
} // namespace io
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index a49faf3..2463102 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -30,97 +30,6 @@
namespace arrow {
namespace io {
-class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
- public:
- void TearDown() { MemoryMapFixture::TearDown(); }
-};
-
-TEST_F(TestMemoryMappedFile, InvalidUsages) {}
-
-TEST_F(TestMemoryMappedFile, WriteRead) {
- const int64_t buffer_size = 1024;
- std::vector<uint8_t> buffer(buffer_size);
-
- test::random_bytes(1024, 0, buffer.data());
-
- const int reps = 5;
-
- std::string path = "ipc-write-read-test";
- CreateFile(path, reps * buffer_size);
-
- std::shared_ptr<MemoryMappedFile> result;
- ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result));
-
- int64_t position = 0;
- std::shared_ptr<Buffer> out_buffer;
- for (int i = 0; i < reps; ++i) {
- ASSERT_OK(result->Write(buffer.data(), buffer_size));
- ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
-
- ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
-
- position += buffer_size;
- }
-}
-
-TEST_F(TestMemoryMappedFile, ReadOnly) {
- const int64_t buffer_size = 1024;
- std::vector<uint8_t> buffer(buffer_size);
-
- test::random_bytes(1024, 0, buffer.data());
-
- const int reps = 5;
-
- std::string path = "ipc-read-only-test";
- CreateFile(path, reps * buffer_size);
-
- std::shared_ptr<MemoryMappedFile> rwmmap;
- ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
-
- int64_t position = 0;
- for (int i = 0; i < reps; ++i) {
- ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
- position += buffer_size;
- }
- rwmmap->Close();
-
- std::shared_ptr<MemoryMappedFile> rommap;
- ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
-
- position = 0;
- std::shared_ptr<Buffer> out_buffer;
- for (int i = 0; i < reps; ++i) {
- ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
-
- ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
- position += buffer_size;
- }
- rommap->Close();
-}
-
-TEST_F(TestMemoryMappedFile, InvalidMode) {
- const int64_t buffer_size = 1024;
- std::vector<uint8_t> buffer(buffer_size);
-
- test::random_bytes(1024, 0, buffer.data());
-
- std::string path = "ipc-invalid-mode-test";
- CreateFile(path, buffer_size);
-
- std::shared_ptr<MemoryMappedFile> rommap;
- ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
-
- ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size));
-}
-
-TEST_F(TestMemoryMappedFile, InvalidFile) {
- std::string non_existent_path = "invalid-file-name-asfd";
-
- std::shared_ptr<MemoryMappedFile> result;
- ASSERT_RAISES(
- IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
-}
-
class TestBufferOutputStream : public ::testing::Test {
public:
void SetUp() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index b5cf4b7..4595268 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -17,19 +17,6 @@
#include "arrow/io/memory.h"
-// sys/mman.h not present in Visual Studio or Cygwin
-#ifdef _WIN32
-#ifndef NOMINMAX
-#define NOMINMAX
-#endif
-#include "arrow/io/mman.h"
-#undef Realloc
-#undef Free
-#include <windows.h>
-#else
-#include <sys/mman.h>
-#endif
-
#include <algorithm>
#include <cerrno>
#include <cstdint>
@@ -45,171 +32,6 @@
namespace arrow {
namespace io {
-// Implement MemoryMappedFile
-
-class MemoryMappedFile::MemoryMappedFileImpl {
- public:
- MemoryMappedFileImpl()
- : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {}
-
- ~MemoryMappedFileImpl() {
- if (is_open_) {
- munmap(data_, size_);
- fclose(file_);
- }
- }
-
- Status Open(const std::string& path, FileMode::type mode) {
- if (is_open_) { return Status::IOError("A file is already open"); }
-
- int prot_flags = PROT_READ;
-
- if (mode == FileMode::READWRITE) {
- file_ = fopen(path.c_str(), "r+b");
- prot_flags |= PROT_WRITE;
- is_writable_ = true;
- } else {
- file_ = fopen(path.c_str(), "rb");
- }
- if (file_ == nullptr) {
- std::stringstream ss;
- ss << "Unable to open file, errno: " << errno;
- return Status::IOError(ss.str());
- }
-
- fseek(file_, 0L, SEEK_END);
- if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); }
- size_ = ftell(file_);
-
- fseek(file_, 0L, SEEK_SET);
- is_open_ = true;
- position_ = 0;
-
- void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0);
- if (result == MAP_FAILED) {
- std::stringstream ss;
- ss << "Memory mapping file failed, errno: " << errno;
- return Status::IOError(ss.str());
- }
- data_ = reinterpret_cast<uint8_t*>(result);
-
- return Status::OK();
- }
-
- int64_t size() const { return size_; }
-
- Status Seek(int64_t position) {
- if (position < 0 || position >= size_) {
- return Status::Invalid("position is out of bounds");
- }
- position_ = position;
- return Status::OK();
- }
-
- int64_t position() { return position_; }
-
- void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }
-
- uint8_t* data() { return data_; }
-
- uint8_t* head() { return data_ + position_; }
-
- bool writable() { return is_writable_; }
-
- bool opened() { return is_open_; }
-
- private:
- FILE* file_;
- int64_t position_;
- int64_t size_;
- bool is_open_;
- bool is_writable_;
-
- // The memory map
- uint8_t* data_;
-};
-
-MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
- ReadableFileInterface::set_mode(mode);
-}
-
-MemoryMappedFile::~MemoryMappedFile() {}
-
-Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
- std::shared_ptr<MemoryMappedFile>* out) {
- std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
-
- result->impl_.reset(new MemoryMappedFileImpl());
- RETURN_NOT_OK(result->impl_->Open(path, mode));
-
- *out = result;
- return Status::OK();
-}
-
-Status MemoryMappedFile::GetSize(int64_t* size) {
- *size = impl_->size();
- return Status::OK();
-}
-
-Status MemoryMappedFile::Tell(int64_t* position) {
- *position = impl_->position();
- return Status::OK();
-}
-
-Status MemoryMappedFile::Seek(int64_t position) {
- return impl_->Seek(position);
-}
-
-Status MemoryMappedFile::Close() {
- // munmap handled in pimpl dtor
- return Status::OK();
-}
-
-Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
- nbytes = std::min(nbytes, impl_->size() - impl_->position());
- std::memcpy(out, impl_->head(), nbytes);
- *bytes_read = nbytes;
- impl_->advance(nbytes);
- return Status::OK();
-}
-
-Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- nbytes = std::min(nbytes, impl_->size() - impl_->position());
- *out = std::make_shared<Buffer>(impl_->head(), nbytes);
- impl_->advance(nbytes);
- return Status::OK();
-}
-
-bool MemoryMappedFile::supports_zero_copy() const {
- return true;
-}
-
-Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
- if (!impl_->opened() || !impl_->writable()) {
- return Status::IOError("Unable to write");
- }
-
- RETURN_NOT_OK(impl_->Seek(position));
- return WriteInternal(data, nbytes);
-}
-
-Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
- if (!impl_->opened() || !impl_->writable()) {
- return Status::IOError("Unable to write");
- }
- if (nbytes + impl_->position() > impl_->size()) {
- return Status::Invalid("Cannot write past end of memory map");
- }
-
- return WriteInternal(data, nbytes);
-}
-
-Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
- memcpy(impl_->head(), data, nbytes);
- impl_->advance(nbytes);
- return Status::OK();
-}
-
// ----------------------------------------------------------------------
// OutputStream that writes to resizable buffer
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 2faf280..2f1d8ec 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -58,45 +58,6 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
uint8_t* mutable_data_;
};
-// A memory source that uses memory-mapped files for memory interactions
-class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
- public:
- ~MemoryMappedFile();
-
- static Status Open(const std::string& path, FileMode::type mode,
- std::shared_ptr<MemoryMappedFile>* out);
-
- Status Close() override;
-
- Status Tell(int64_t* position) override;
-
- Status Seek(int64_t position) override;
-
- // Required by ReadableFileInterface, copies memory into out
- Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
-
- // Zero copy read
- Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
-
- bool supports_zero_copy() const override;
-
- Status Write(const uint8_t* data, int64_t nbytes) override;
-
- Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
-
- // @return: the size in bytes of the memory source
- Status GetSize(int64_t* size) override;
-
- private:
- explicit MemoryMappedFile(FileMode::type mode);
-
- Status WriteInternal(const uint8_t* data, int64_t nbytes);
-
- // Hide the internal details of this class for now
- class ARROW_NO_EXPORT MemoryMappedFileImpl;
- std::unique_ptr<MemoryMappedFileImpl> impl_;
-};
-
class ARROW_EXPORT BufferReader : public ReadableFileInterface {
public:
explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
http://git-wip-us.apache.org/repos/asf/arrow/blob/d9df5567/cpp/src/arrow/io/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
index 1468083..6e91713 100644
--- a/cpp/src/arrow/io/test-common.h
+++ b/cpp/src/arrow/io/test-common.h
@@ -33,6 +33,7 @@
#endif
#include "arrow/buffer.h"
+#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/test-util.h"