You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/04/21 14:05:12 UTC
[arrow] branch master updated: ARROW-1018: [C++] Create
FileOutputStream, ReadableFile from file descriptor
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 54df19d ARROW-1018: [C++] Create FileOutputStream, ReadableFile from file descriptor
54df19d is described below
commit 54df19db82eca92ffddbdcd97aab723d5c393f5f
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Sat Apr 21 15:05:08 2018 +0100
ARROW-1018: [C++] Create FileOutputStream, ReadableFile from file descriptor
Also:
- move platform-specific code to `util/file-util.h`
- backport the `FileReadAt` helper from PR #1867 (to minimize conflicts)
- add a pipe-writing benchmark for more realistic numbers
Author: Antoine Pitrou <an...@python.org>
Closes #1909 from pitrou/ARROW-1018-open-fd and squashes the following commits:
0153b150 <Antoine Pitrou> Add pipe-writing benchmark
57ea1657 <Antoine Pitrou> ARROW-1018: Allow creating file objects from a file descriptor
---
.travis.yml | 2 +-
cpp/src/arrow/CMakeLists.txt | 3 +-
cpp/src/arrow/io/file.cc | 414 ++++++---------------------------
cpp/src/arrow/io/file.h | 37 +++
cpp/src/arrow/io/io-file-benchmark.cc | 181 +++++++++++++--
cpp/src/arrow/io/io-file-test.cc | 160 +++++++++++--
cpp/src/arrow/test-util.h | 5 +-
cpp/src/arrow/util/io-util.cc | 418 ++++++++++++++++++++++++++++++++++
cpp/src/arrow/util/io-util.h | 51 +++++
9 files changed, 893 insertions(+), 378 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 2fd31be..59ad4bc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -87,7 +87,7 @@ matrix:
before_script:
- if [ $ARROW_CI_PYTHON_AFFECTED != "1" ]; then exit; fi
# If either C++ or Python changed, we must install the C++ libraries
- - travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
+ - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
script:
- if [ $ARROW_CI_CPP_AFFECTED == "1" ]; then $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh; fi
- $TRAVIS_BUILD_DIR/ci/travis_build_parquet_cpp.sh
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index da7e24d..a3997c7 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -40,11 +40,12 @@ set(ARROW_SRCS
util/cpu-info.cc
util/decimal.cc
util/hash.cc
+ util/io-util.cc
util/key_value_metadata.cc
)
if ("${COMPILER_FAMILY}" STREQUAL "clang")
- set_property(SOURCE io/file.cc
+ set_property(SOURCE util/io-util.cc
APPEND_STRING
PROPERTY COMPILE_FLAGS
" -Wno-unused-macros ")
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 008f2b2..ba012be 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -15,37 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-// Ensure 64-bit off_t for platforms where it matters
-#ifdef _FILE_OFFSET_BITS
-#undef _FILE_OFFSET_BITS
-#endif
-
-#define _FILE_OFFSET_BITS 64
-
-// define max read/write count
-#if defined(_MSC_VER)
-#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
-#else
-
-#ifdef __APPLE__
-// due to macOS bug, we need to set read/write max
-#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
-#else
-// see notes on Linux read/write manpage
-#define ARROW_MAX_IO_CHUNKSIZE 0x7ffff000
-#endif
-
-#endif
-
-#include "arrow/io/file.h"
-
-#if _WIN32 || _WIN64
-#if _WIN64
-#define ENVIRONMENT64
-#else
-#define ENVIRONMENT32
-#endif
-#endif
+#include "arrow/io/windows_compatibility.h"
// sys/mman.h not present in Visual Studio or Cygwin
#ifdef _WIN32
@@ -55,334 +25,88 @@
#include "arrow/io/mman.h"
#undef Realloc
#undef Free
-#include <windows.h>
#else
#include <sys/mman.h>
#endif
+#include <string.h>
+
#include <algorithm>
#include <cerrno>
#include <cstdint>
#include <cstring>
#include <mutex>
-#include <sstream> // IWYU pragma: keep
-
-#if defined(_MSC_VER)
-#include <codecvt>
-#include <locale>
-#endif
-
-#include <fcntl.h>
-#include <sys/stat.h>
-#include <sys/types.h> // IWYU pragma: keep
-
-#ifndef _MSC_VER // POSIX-like platforms
-
-#include <unistd.h>
-
-// Not available on some platforms
-#ifndef errno_t
-#define errno_t int
-#endif
-
-#endif // _MSC_VER
-
-// defines that don't exist in MinGW
-#if defined(__MINGW32__)
-#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
-#elif defined(_MSC_VER) // Visual Studio
-
-#else // gcc / clang on POSIX platforms
-#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
-#endif
-
-// ----------------------------------------------------------------------
-// file compatibility stuff
-
-#if defined(__MINGW32__) // MinGW
-// nothing
-#elif defined(_MSC_VER) // Visual Studio
-#include <io.h>
-#else // POSIX / Linux
-// nothing
-#endif
-
-// POSIX systems do not have this
-#ifndef O_BINARY
-#define O_BINARY 0
-#endif
+#include <sstream>
// ----------------------------------------------------------------------
// Other Arrow includes
+#include "arrow/io/file.h"
#include "arrow/io/interfaces.h"
#include "arrow/buffer.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
+#include "arrow/util/io-util.h"
#include "arrow/util/logging.h"
-#if defined(_MSC_VER)
-#include <boost/filesystem.hpp> // NOLINT
-#include <boost/system/system_error.hpp> // NOLINT
-namespace fs = boost::filesystem;
-#define PlatformFilename fs::path
-
-namespace arrow {
-namespace io {
-
-#else
namespace arrow {
namespace io {
-struct PlatformFilename {
- PlatformFilename() {}
- explicit PlatformFilename(const std::string& path) { utf8_path = path; }
-
- const char* c_str() const { return utf8_path.c_str(); }
-
- const std::string& string() const { return utf8_path; }
-
- size_t length() const { return utf8_path.size(); }
-
- std::string utf8_path;
-};
-#endif
-
-static inline Status CheckFileOpResult(int ret, int errno_actual,
- const PlatformFilename& file_name,
- const std::string& opname) {
- if (ret == -1) {
- std::stringstream ss;
- ss << "Failed to " << opname << " file: " << file_name.string();
- ss << " , error: " << std::strerror(errno_actual);
- return Status::IOError(ss.str());
- }
- return Status::OK();
-}
-
-#define CHECK_LSEEK(retval) \
- if ((retval) == -1) return Status::IOError("lseek failed");
-
-static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
-#if defined(_MSC_VER)
- return _lseeki64(fd, pos, whence);
-#else
- return lseek(fd, pos, whence);
-#endif
-}
-
-static inline Status FileOpenReadable(const PlatformFilename& file_name, int* fd) {
- int ret;
- errno_t errno_actual = 0;
-#if defined(_MSC_VER)
- errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), _O_RDONLY | _O_BINARY,
- _SH_DENYNO, _S_IREAD);
- ret = *fd;
-#else
- ret = *fd = open(file_name.c_str(), O_RDONLY | O_BINARY);
- errno_actual = errno;
-#endif
-
- return CheckFileOpResult(ret, errno_actual, file_name, "open local");
-}
-
-static inline Status FileOpenWriteable(const PlatformFilename& file_name, bool write_only,
- bool truncate, int* fd) {
- int ret;
- errno_t errno_actual = 0;
-
-#if defined(_MSC_VER)
- int oflag = _O_CREAT | _O_BINARY;
- int pmode = _S_IWRITE;
- if (!write_only) {
- pmode |= _S_IREAD;
- }
-
- if (truncate) {
- oflag |= _O_TRUNC;
- }
-
- if (write_only) {
- oflag |= _O_WRONLY;
- } else {
- oflag |= _O_RDWR;
- }
-
- errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), oflag, _SH_DENYNO, pmode);
- ret = *fd;
-
-#else
- int oflag = O_CREAT | O_BINARY;
-
- if (truncate) {
- oflag |= O_TRUNC;
- }
-
- if (write_only) {
- oflag |= O_WRONLY;
- } else {
- oflag |= O_RDWR;
- }
-
- ret = *fd = open(file_name.c_str(), oflag, ARROW_WRITE_SHMODE);
-#endif
- return CheckFileOpResult(ret, errno_actual, file_name, "open local");
-}
-
-static inline Status FileTell(int fd, int64_t* pos) {
- int64_t current_pos;
-
-#if defined(_MSC_VER)
- current_pos = _telli64(fd);
- if (current_pos == -1) {
- return Status::IOError("_telli64 failed");
- }
-#else
- current_pos = lseek64_compat(fd, 0, SEEK_CUR);
- CHECK_LSEEK(current_pos);
-#endif
-
- *pos = current_pos;
- return Status::OK();
-}
-
-static inline Status FileSeek(int fd, int64_t pos) {
- int64_t ret = lseek64_compat(fd, pos, SEEK_SET);
- CHECK_LSEEK(ret);
- return Status::OK();
-}
-
-static inline Status FileRead(const int fd, uint8_t* buffer, const int64_t nbytes,
- int64_t* bytes_read) {
- *bytes_read = 0;
-
- while (*bytes_read != -1 && *bytes_read < nbytes) {
- int64_t chunksize =
- std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - *bytes_read);
-#if defined(_MSC_VER)
- int64_t ret = static_cast<int64_t>(
- _read(fd, buffer + *bytes_read, static_cast<uint32_t>(chunksize)));
-#else
- int64_t ret = static_cast<int64_t>(
- read(fd, buffer + *bytes_read, static_cast<size_t>(chunksize)));
-#endif
-
- if (ret != -1) {
- *bytes_read += ret;
- if (ret < chunksize) {
- // EOF
- break;
- }
- } else {
- *bytes_read = ret;
- }
- }
-
- if (*bytes_read == -1) {
- return Status::IOError(std::string("Error reading bytes from file: ") +
- std::string(strerror(errno)));
- }
-
- return Status::OK();
-}
-
-static inline Status FileWrite(const int fd, const uint8_t* buffer,
- const int64_t nbytes) {
- int ret = 0;
- int64_t bytes_written = 0;
-
- while (ret != -1 && bytes_written < nbytes) {
- int64_t chunksize =
- std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - bytes_written);
-#if defined(_MSC_VER)
- ret = static_cast<int>(
- _write(fd, buffer + bytes_written, static_cast<uint32_t>(chunksize)));
-#else
- ret = static_cast<int>(
- write(fd, buffer + bytes_written, static_cast<size_t>(chunksize)));
-#endif
-
- if (ret != -1) {
- bytes_written += ret;
- }
- }
-
- if (ret == -1) {
- return Status::IOError(std::string("Error writing bytes from file: ") +
- std::string(strerror(errno)));
- }
- return Status::OK();
-}
-
-static inline Status FileGetSize(int fd, int64_t* size) {
- int64_t ret;
-
- // Save current position
- int64_t current_position = lseek64_compat(fd, 0, SEEK_CUR);
- CHECK_LSEEK(current_position);
-
- // move to end of the file
- ret = lseek64_compat(fd, 0, SEEK_END);
- CHECK_LSEEK(ret);
-
- // Get file length
- ret = lseek64_compat(fd, 0, SEEK_CUR);
- CHECK_LSEEK(ret);
-
- *size = ret;
-
- // Restore file position
- ret = lseek64_compat(fd, current_position, SEEK_SET);
- CHECK_LSEEK(ret);
-
- return Status::OK();
-}
-
-static inline Status FileClose(int fd) {
- int ret;
-
-#if defined(_MSC_VER)
- ret = static_cast<int>(_close(fd));
-#else
- ret = static_cast<int>(close(fd));
-#endif
-
- if (ret == -1) {
- return Status::IOError("error closing file");
- }
- return Status::OK();
-}
-
class OSFile {
public:
OSFile() : fd_(-1), is_open_(false), size_(-1) {}
~OSFile() {}
+ // Note: only one of the Open* methods below may be called on a given instance
+
Status OpenWriteable(const std::string& path, bool append, bool write_only) {
RETURN_NOT_OK(SetFileName(path));
- RETURN_NOT_OK(FileOpenWriteable(file_name_, write_only, !append, &fd_));
+ RETURN_NOT_OK(internal::FileOpenWriteable(file_name_, write_only, !append, &fd_));
is_open_ = true;
mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
if (append) {
- RETURN_NOT_OK(FileGetSize(fd_, &size_));
+ RETURN_NOT_OK(internal::FileGetSize(fd_, &size_));
} else {
size_ = 0;
}
return Status::OK();
}
+ // This is different from OpenWriteable(string, ...) in that it doesn't
+ // truncate nor mandate a seekable file
+ Status OpenWriteable(int fd) {
+ if (!internal::FileGetSize(fd, &size_).ok()) {
+ // Non-seekable file
+ size_ = -1;
+ }
+ RETURN_NOT_OK(SetFileName(fd));
+ is_open_ = true;
+ mode_ = FileMode::WRITE;
+ fd_ = fd;
+ return Status::OK();
+ }
+
Status OpenReadable(const std::string& path) {
RETURN_NOT_OK(SetFileName(path));
- RETURN_NOT_OK(FileOpenReadable(file_name_, &fd_));
- RETURN_NOT_OK(FileGetSize(fd_, &size_));
+ RETURN_NOT_OK(internal::FileOpenReadable(file_name_, &fd_));
+ RETURN_NOT_OK(internal::FileGetSize(fd_, &size_));
+
+ is_open_ = true;
+ mode_ = FileMode::READ;
+ return Status::OK();
+ }
+ Status OpenReadable(int fd) {
+ RETURN_NOT_OK(internal::FileGetSize(fd, &size_));
+ RETURN_NOT_OK(SetFileName(fd));
is_open_ = true;
mode_ = FileMode::READ;
+ fd_ = fd;
return Status::OK();
}
@@ -391,13 +115,13 @@ class OSFile {
// Even if closing fails, the fd will likely be closed (perhaps it's
// already closed).
is_open_ = false;
- RETURN_NOT_OK(FileClose(fd_));
+ RETURN_NOT_OK(internal::FileClose(fd_));
}
return Status::OK();
}
Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
- return FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
+ return internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
}
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
@@ -410,17 +134,17 @@ class OSFile {
if (pos < 0) {
return Status::Invalid("Invalid position");
}
- return FileSeek(fd_, pos);
+ return internal::FileSeek(fd_, pos);
}
- Status Tell(int64_t* pos) const { return FileTell(fd_, pos); }
+ Status Tell(int64_t* pos) const { return internal::FileTell(fd_, pos); }
Status Write(const void* data, int64_t length) {
std::lock_guard<std::mutex> guard(lock_);
if (length < 0) {
return Status::IOError("Length must be non-negative");
}
- return FileWrite(fd_, reinterpret_cast<const uint8_t*>(data), length);
+ return internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data), length);
}
int fd() const { return fd_; }
@@ -435,20 +159,15 @@ class OSFile {
protected:
Status SetFileName(const std::string& file_name) {
-#if defined(_MSC_VER)
- try {
- std::codecvt_utf8_utf16<wchar_t> utf16_converter;
- file_name_.assign(file_name, utf16_converter);
- } catch (boost::system::system_error& e) {
- return Status::Invalid(e.what());
- }
-#else
- file_name_ = PlatformFilename(file_name);
-#endif
- return Status::OK();
+ return internal::FileNameFromString(file_name, &file_name_);
+ }
+ Status SetFileName(int fd) {
+ std::stringstream ss;
+ ss << "<fd " << fd << ">";
+ return SetFileName(ss.str());
}
- PlatformFilename file_name_;
+ internal::PlatformFilename file_name_;
std::mutex lock_;
@@ -469,6 +188,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {}
Status Open(const std::string& path) { return OpenReadable(path); }
+ Status Open(int fd) { return OpenReadable(fd); }
Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) {
std::shared_ptr<ResizableBuffer> buffer;
@@ -492,8 +212,7 @@ ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(
ReadableFile::~ReadableFile() { DCHECK(impl_->Close().ok()); }
Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile>* file) {
- *file = std::shared_ptr<ReadableFile>(new ReadableFile(default_memory_pool()));
- return (*file)->impl_->Open(path);
+ return Open(path, default_memory_pool(), file);
}
Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
@@ -502,6 +221,16 @@ Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
return (*file)->impl_->Open(path);
}
+Status ReadableFile::Open(int fd, MemoryPool* memory_pool,
+ std::shared_ptr<ReadableFile>* file) {
+ *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool));
+ return (*file)->impl_->Open(fd);
+}
+
+Status ReadableFile::Open(int fd, std::shared_ptr<ReadableFile>* file) {
+ return Open(fd, default_memory_pool(), file);
+}
+
Status ReadableFile::Close() { return impl_->Close(); }
Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); }
@@ -545,8 +274,9 @@ int ReadableFile::file_descriptor() const { return impl_->fd(); }
class FileOutputStream::FileOutputStreamImpl : public OSFile {
public:
Status Open(const std::string& path, bool append) {
- return OpenWriteable(path, append, true);
+ return OpenWriteable(path, append, true /* write_only */);
}
+ Status Open(int fd) { return OpenWriteable(fd); }
};
FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); }
@@ -567,6 +297,11 @@ Status FileOutputStream::Open(const std::string& path, bool append,
return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(path, append);
}
+Status FileOutputStream::Open(int fd, std::shared_ptr<OutputStream>* out) {
+ *out = std::shared_ptr<FileOutputStream>(new FileOutputStream());
+ return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(fd);
+}
+
Status FileOutputStream::Open(const std::string& path,
std::shared_ptr<FileOutputStream>* file) {
return Open(path, false, file);
@@ -579,6 +314,11 @@ Status FileOutputStream::Open(const std::string& path, bool append,
return (*file)->impl_->Open(path, append);
}
+Status FileOutputStream::Open(int fd, std::shared_ptr<FileOutputStream>* file) {
+ *file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
+ return (*file)->impl_->Open(fd);
+}
+
Status FileOutputStream::Close() { return impl_->Close(); }
Status FileOutputStream::Tell(int64_t* pos) const { return impl_->Tell(pos); }
@@ -682,20 +422,10 @@ MemoryMappedFile::~MemoryMappedFile() {}
Status MemoryMappedFile::Create(const std::string& path, int64_t size,
std::shared_ptr<MemoryMappedFile>* out) {
- int ret;
- errno_t errno_actual;
std::shared_ptr<FileOutputStream> file;
RETURN_NOT_OK(FileOutputStream::Open(path, &file));
-#ifdef _MSC_VER
- errno_actual = _chsize_s(file->file_descriptor(), static_cast<size_t>(size));
- ret = errno_actual == 0 ? 0 : -1;
-#else
- ret = ftruncate(file->file_descriptor(), static_cast<size_t>(size));
- errno_actual = errno;
-#endif
-
- RETURN_NOT_OK(CheckFileOpResult(ret, errno_actual, PlatformFilename(path), "truncate"));
+ RETURN_NOT_OK(internal::FileTruncate(file->file_descriptor(), size));
RETURN_NOT_OK(file->Close());
return MemoryMappedFile::Open(path, FileMode::READWRITE, out);
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index a1f9edc..c2572da 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -54,6 +54,15 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
static Status Open(const std::string& path, bool append,
std::shared_ptr<OutputStream>* out);
+ /// \brief Open a file descriptor for writing. The underlying file isn't
+ /// truncated.
+ /// \param[in] fd file descriptor
+ /// \param[out] out a base interface OutputStream instance
+ ///
+ /// The file descriptor becomes owned by the OutputStream, and will be closed
+ /// on Close() or destruction.
+ static Status Open(int fd, std::shared_ptr<OutputStream>* out);
+
/// \brief Open a local file for writing, truncating any existing file
/// \param[in] path with UTF8 encoding
/// \param[out] file a FileOutputStream instance
@@ -69,6 +78,15 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
static Status Open(const std::string& path, bool append,
std::shared_ptr<FileOutputStream>* file);
+ /// \brief Open a file descriptor for writing. The underlying file isn't
+ /// truncated.
+ /// \param[in] fd file descriptor
+ /// \param[out] out a FileOutputStream instance
+ ///
+ /// The file descriptor becomes owned by the OutputStream, and will be closed
+ /// on Close() or destruction.
+ static Status Open(int fd, std::shared_ptr<FileOutputStream>* out);
+
// OutputStream interface
Status Close() override;
Status Tell(int64_t* position) const override;
@@ -104,6 +122,25 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile {
static Status Open(const std::string& path, MemoryPool* pool,
std::shared_ptr<ReadableFile>* file);
+ /// \brief Open a local file for reading
+ /// \param[in] fd file descriptor
+ /// \param[out] file ReadableFile instance
+ /// Open file with one's own memory pool for memory allocations
+ ///
+ /// The file descriptor becomes owned by the ReadableFile, and will be closed
+ /// on Close() or destruction.
+ static Status Open(int fd, std::shared_ptr<ReadableFile>* file);
+
+ /// \brief Open a local file for reading
+ /// \param[in] fd file descriptor
+ /// \param[in] pool a MemoryPool for memory allocations
+ /// \param[out] file ReadableFile instance
+ /// Open file with one's own memory pool for memory allocations
+ ///
+ /// The file descriptor becomes owned by the ReadableFile, and will be closed
+ /// on Close() or destruction.
+ static Status Open(int fd, MemoryPool* pool, std::shared_ptr<ReadableFile>* file);
+
Status Close() override;
Status Tell(int64_t* position) const override;
diff --git a/cpp/src/arrow/io/io-file-benchmark.cc b/cpp/src/arrow/io/io-file-benchmark.cc
index bb35b08..e5a326e 100644
--- a/cpp/src/arrow/io/io-file-benchmark.cc
+++ b/cpp/src/arrow/io/io-file-benchmark.cc
@@ -19,25 +19,109 @@
#include "arrow/io/buffered.h"
#include "arrow/io/file.h"
#include "arrow/test-util.h"
+#include "arrow/util/io-util.h"
#include "benchmark/benchmark.h"
#include <algorithm>
+#include <atomic>
+#include <cstdlib>
#include <iostream>
+#include <thread>
#include <valarray>
+#include <fcntl.h>
+#include <poll.h>
+#include <unistd.h>
+
namespace arrow {
-// XXX Writing to /dev/null is irrealistic as the kernel likely doesn't
-// copy the data at all. Use a socketpair instead?
std::string GetNullFile() { return "/dev/null"; }
const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
const std::valarray<int64_t> large_sizes = {8192, 100000};
+class BackgroundReader {
+ // A class that reads data in the background from a file descriptor
+
+ public:
+ static std::shared_ptr<BackgroundReader> StartReader(int fd) {
+ std::shared_ptr<BackgroundReader> reader(new BackgroundReader(fd));
+ reader->worker_.reset(new std::thread([=] { reader->LoopReading(); }));
+ return reader;
+ }
+ void Stop() {
+ const uint8_t data[] = "x";
+ ABORT_NOT_OK(internal::FileWrite(wakeup_w_, data, 1));
+ }
+ void Join() { worker_->join(); }
+
+ ~BackgroundReader() {
+ for (int fd : {fd_, wakeup_r_, wakeup_w_}) {
+ ABORT_NOT_OK(internal::FileClose(fd));
+ }
+ }
+
+ protected:
+ explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
+ // Prepare self-pipe trick
+ int wakeupfd[2];
+ ABORT_NOT_OK(internal::CreatePipe(wakeupfd));
+ wakeup_r_ = wakeupfd[0];
+ wakeup_w_ = wakeupfd[1];
+ // Put fd in non-blocking mode
+ fcntl(fd, F_SETFL, O_NONBLOCK);
+ }
+
+ void LoopReading() {
+ struct pollfd pollfds[2];
+ pollfds[0].fd = fd_;
+ pollfds[0].events = POLLIN;
+ pollfds[1].fd = wakeup_r_;
+ pollfds[1].events = POLLIN;
+ while (true) {
+ int ret = poll(pollfds, 2, -1 /* timeout */);
+ if (ret < 1) {
+ std::cerr << "poll() failed with code " << ret << "\n";
+ abort();
+ }
+ if (pollfds[1].revents & POLLIN) {
+ // We're done
+ break;
+ }
+ if (!(pollfds[0].revents & POLLIN)) {
+ continue;
+ }
+ int64_t bytes_read;
+ // There could be a spurious wakeup followed by EAGAIN
+ ARROW_UNUSED(internal::FileRead(fd_, buffer_, buffer_size_, &bytes_read));
+ total_bytes_ += bytes_read;
+ }
+ }
+
+ int fd_, wakeup_r_, wakeup_w_;
+ int64_t total_bytes_;
+
+ static const int64_t buffer_size_ = 16384;
+ uint8_t buffer_[buffer_size_];
+
+ std::unique_ptr<std::thread> worker_;
+};
+
+// Set up a pipe with an OutputStream at one end and a BackgroundReader at
+// the other end.
+static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
+ std::shared_ptr<BackgroundReader>* reader) {
+ int fd[2];
+ ABORT_NOT_OK(internal::CreatePipe(fd));
+ ABORT_NOT_OK(io::FileOutputStream::Open(fd[1], stream));
+ *reader = BackgroundReader::StartReader(fd[0]);
+}
+
static void BenchmarkStreamingWrites(benchmark::State& state,
std::valarray<int64_t> sizes,
- io::OutputStream* stream) {
+ io::OutputStream* stream,
+ BackgroundReader* reader = nullptr) {
const std::string datastr(*std::max_element(std::begin(sizes), std::end(sizes)), 'x');
const void* data = datastr.data();
const int64_t sum_sizes = sizes.sum();
@@ -47,10 +131,23 @@ static void BenchmarkStreamingWrites(benchmark::State& state,
ABORT_NOT_OK(stream->Write(data, size));
}
}
- state.SetBytesProcessed(int64_t(state.iterations()) * sum_sizes);
+ const int64_t total_bytes = static_cast<int64_t>(state.iterations()) * sum_sizes;
+ state.SetBytesProcessed(total_bytes);
+
+ if (reader != nullptr) {
+ // Wake up and stop
+ reader->Stop();
+ reader->Join();
+ }
+ ABORT_NOT_OK(stream->Close());
}
-static void BM_FileOutputStreamSmallWrites(
+// Benchmark writing to /dev/null
+//
+// This situation is irrealistic as the kernel likely doesn't
+// copy the data at all, so we only measure small writes.
+
+static void BM_FileOutputStreamSmallWritesToNull(
benchmark::State& state) { // NOLINT non-const reference
std::shared_ptr<io::OutputStream> stream;
ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
@@ -58,38 +155,84 @@ static void BM_FileOutputStreamSmallWrites(
BenchmarkStreamingWrites(state, small_sizes, stream.get());
}
-static void BM_FileOutputStreamLargeWrites(
+static void BM_BufferedOutputStreamSmallWritesToNull(
benchmark::State& state) { // NOLINT non-const reference
std::shared_ptr<io::OutputStream> stream;
ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
+ stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
- BenchmarkStreamingWrites(state, large_sizes, stream.get());
+ BenchmarkStreamingWrites(state, small_sizes, stream.get());
}
-static void BM_BufferedOutputStreamSmallWrites(
+// Benchmark writing a pipe
+//
+// This is slightly more realistic than the above
+
+static void BM_FileOutputStreamSmallWritesToPipe(
benchmark::State& state) { // NOLINT non-const reference
std::shared_ptr<io::OutputStream> stream;
- ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
- stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+ std::shared_ptr<BackgroundReader> reader;
+ SetupPipeWriter(&stream, &reader);
- BenchmarkStreamingWrites(state, small_sizes, stream.get());
+ BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
}
-static void BM_BufferedOutputStreamLargeWrites(
+static void BM_FileOutputStreamLargeWritesToPipe(
benchmark::State& state) { // NOLINT non-const reference
std::shared_ptr<io::OutputStream> stream;
- ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
- stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+ std::shared_ptr<BackgroundReader> reader;
+ SetupPipeWriter(&stream, &reader);
- BenchmarkStreamingWrites(state, large_sizes, stream.get());
+ BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
}
-BENCHMARK(BM_FileOutputStreamSmallWrites)->Repetitions(2)->MinTime(1.0);
+static void BM_BufferedOutputStreamSmallWritesToPipe(
+ benchmark::State& state) { // NOLINT non-const reference
+ std::shared_ptr<io::OutputStream> stream;
+ std::shared_ptr<BackgroundReader> reader;
+ SetupPipeWriter(&stream, &reader);
+ stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+
+ BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
+}
-BENCHMARK(BM_FileOutputStreamLargeWrites)->Repetitions(2)->MinTime(1.0);
+static void BM_BufferedOutputStreamLargeWritesToPipe(
+ benchmark::State& state) { // NOLINT non-const reference
+ std::shared_ptr<io::OutputStream> stream;
+ std::shared_ptr<BackgroundReader> reader;
+ SetupPipeWriter(&stream, &reader);
+ stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
-BENCHMARK(BM_BufferedOutputStreamSmallWrites)->Repetitions(2)->MinTime(1.0);
+ BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
+}
-BENCHMARK(BM_BufferedOutputStreamLargeWrites)->Repetitions(2)->MinTime(1.0);
+// We use real time as we don't want to count CPU time spent in the
+// BackgroundReader thread
+
+BENCHMARK(BM_FileOutputStreamSmallWritesToNull)
+ ->Repetitions(2)
+ ->MinTime(1.0)
+ ->UseRealTime();
+BENCHMARK(BM_FileOutputStreamSmallWritesToPipe)
+ ->Repetitions(2)
+ ->MinTime(1.0)
+ ->UseRealTime();
+BENCHMARK(BM_FileOutputStreamLargeWritesToPipe)
+ ->Repetitions(2)
+ ->MinTime(1.0)
+ ->UseRealTime();
+
+BENCHMARK(BM_BufferedOutputStreamSmallWritesToNull)
+ ->Repetitions(2)
+ ->MinTime(1.0)
+ ->UseRealTime();
+BENCHMARK(BM_BufferedOutputStreamSmallWritesToPipe)
+ ->Repetitions(2)
+ ->MinTime(1.0)
+ ->UseRealTime();
+BENCHMARK(BM_BufferedOutputStreamLargeWritesToPipe)
+ ->Repetitions(2)
+ ->MinTime(1.0)
+ ->UseRealTime();
} // namespace arrow
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 098e82f..d3ef908 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -36,6 +36,7 @@
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/test-util.h"
+#include "arrow/util/io-util.h"
namespace arrow {
namespace io {
@@ -43,7 +44,7 @@ namespace io {
class FileTestFixture : public ::testing::Test {
public:
void SetUp() {
- path_ = "arrow-test-io-file-output-stream.txt";
+ path_ = "arrow-test-io-file.txt";
EnsureFileDeleted();
}
@@ -68,6 +69,17 @@ class TestFileOutputStream : public FileTestFixture {
ASSERT_OK(FileOutputStream::Open(path_, append, &file_));
ASSERT_OK(FileOutputStream::Open(path_, append, &stream_));
}
+ void OpenFileDescriptor() {
+ internal::PlatformFilename file_name;
+ ASSERT_OK(internal::FileNameFromString(path_, &file_name));
+ int fd_file, fd_stream;
+ ASSERT_OK(internal::FileOpenWriteable(file_name, true /* write_only */,
+ false /* truncate */, &fd_file));
+ ASSERT_OK(FileOutputStream::Open(fd_file, &file_));
+ ASSERT_OK(internal::FileOpenWriteable(file_name, true /* write_only */,
+ false /* truncate */, &fd_stream));
+ ASSERT_OK(FileOutputStream::Open(fd_stream, &stream_));
+ }
protected:
std::shared_ptr<FileOutputStream> file_;
@@ -90,19 +102,27 @@ TEST_F(TestFileOutputStream, FileNameWideCharConversionRangeException) {
#endif
TEST_F(TestFileOutputStream, DestructorClosesFile) {
- int fd;
- {
- std::shared_ptr<FileOutputStream> file;
- ASSERT_OK(FileOutputStream::Open(path_, &file));
- fd = file->file_descriptor();
- }
- ASSERT_TRUE(FileIsClosed(fd));
- {
- std::shared_ptr<OutputStream> stream;
- ASSERT_OK(FileOutputStream::Open(path_, &stream));
- fd = std::static_pointer_cast<FileOutputStream>(stream)->file_descriptor();
- }
- ASSERT_TRUE(FileIsClosed(fd));
+ int fd_file, fd_stream;
+
+ OpenFile();
+ fd_file = file_->file_descriptor();
+ fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
+ ASSERT_FALSE(FileIsClosed(fd_file));
+ file_.reset();
+ ASSERT_TRUE(FileIsClosed(fd_file));
+ ASSERT_FALSE(FileIsClosed(fd_stream));
+ stream_.reset();
+ ASSERT_TRUE(FileIsClosed(fd_stream));
+
+ OpenFileDescriptor();
+ fd_file = file_->file_descriptor();
+ fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
+ ASSERT_FALSE(FileIsClosed(fd_file));
+ file_.reset();
+ ASSERT_TRUE(FileIsClosed(fd_file));
+ ASSERT_FALSE(FileIsClosed(fd_stream));
+ stream_.reset();
+ ASSERT_TRUE(FileIsClosed(fd_stream));
}
TEST_F(TestFileOutputStream, Close) {
@@ -132,6 +152,33 @@ TEST_F(TestFileOutputStream, Close) {
AssertFileContents(path_, data);
}
+TEST_F(TestFileOutputStream, FromFileDescriptor) {
+ OpenFileDescriptor();
+ stream_.reset();
+
+ std::string data1 = "test";
+ ASSERT_OK(file_->Write(data1.data(), data1.size()));
+ int fd = file_->file_descriptor();
+ ASSERT_OK(file_->Close());
+ ASSERT_TRUE(FileIsClosed(fd));
+
+ AssertFileContents(path_, data1);
+
+ // Re-open at end of file
+ internal::PlatformFilename file_name;
+ ASSERT_OK(internal::FileNameFromString(path_, &file_name));
+ ASSERT_OK(internal::FileOpenWriteable(file_name, true /* write_only */,
+ false /* truncate */, &fd));
+ ASSERT_OK(internal::FileSeek(fd, 0, SEEK_END));
+ ASSERT_OK(FileOutputStream::Open(fd, &stream_));
+
+ std::string data2 = "data";
+ ASSERT_OK(stream_->Write(data2.data(), data2.size()));
+ ASSERT_OK(stream_->Close());
+
+ AssertFileContents(path_, data1 + data2);
+}
+
TEST_F(TestFileOutputStream, InvalidWrites) {
OpenFile();
@@ -225,6 +272,32 @@ TEST_F(TestReadableFile, Close) {
// Idempotent
ASSERT_OK(file_->Close());
+ ASSERT_TRUE(FileIsClosed(fd));
+}
+
+TEST_F(TestReadableFile, FromFileDescriptor) {
+ MakeTestFile();
+
+ internal::PlatformFilename file_name;
+ int fd = -2;
+ ASSERT_OK(internal::FileNameFromString(path_, &file_name));
+ ASSERT_OK(internal::FileOpenReadable(file_name, &fd));
+ ASSERT_GE(fd, 0);
+ ASSERT_OK(internal::FileSeek(fd, 4));
+
+ ASSERT_OK(ReadableFile::Open(fd, &file_));
+ ASSERT_EQ(file_->file_descriptor(), fd);
+ std::shared_ptr<Buffer> buf;
+ ASSERT_OK(file_->Read(5, &buf));
+ ASSERT_EQ(buf->size(), 4);
+ ASSERT_TRUE(buf->Equals(Buffer("data")));
+
+ ASSERT_FALSE(FileIsClosed(fd));
+ ASSERT_OK(file_->Close());
+ ASSERT_TRUE(FileIsClosed(fd));
+ // Idempotent
+ ASSERT_OK(file_->Close());
+ ASSERT_TRUE(FileIsClosed(fd));
}
TEST_F(TestReadableFile, SeekTellSize) {
@@ -407,6 +480,65 @@ TEST_F(TestReadableFile, ThreadSafety) {
}
// ----------------------------------------------------------------------
+// Pipe I/O tests using FileOutputStream
+// (cannot test using ReadableFile as it currently requires seeking)
+
+class TestPipeIO : public ::testing::Test {
+ public:
+ void MakePipe() {
+ int fd[2];
+ ASSERT_OK(internal::CreatePipe(fd));
+ r_ = fd[0];
+ w_ = fd[1];
+ ASSERT_GE(r_, 0);
+ ASSERT_GE(w_, 0);
+ }
+ void ClosePipe() {
+ if (r_ != -1) {
+ ASSERT_OK(internal::FileClose(r_));
+ r_ = -1;
+ }
+ if (w_ != -1) {
+ ASSERT_OK(internal::FileClose(w_));
+ w_ = -1;
+ }
+ }
+ void TearDown() { ClosePipe(); }
+
+ protected:
+ int r_ = -1, w_ = -1;
+};
+
+TEST_F(TestPipeIO, TestWrite) {
+ std::string data1 = "test", data2 = "data!";
+ std::shared_ptr<FileOutputStream> file;
+ uint8_t buffer[10];
+ int64_t bytes_read;
+
+ MakePipe();
+ ASSERT_OK(FileOutputStream::Open(w_, &file));
+ w_ = -1; // now owned by FileOutputStream
+
+ ASSERT_OK(file->Write(data1.data(), data1.size()));
+ ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read));
+ ASSERT_EQ(bytes_read, 4);
+ ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
+
+ ASSERT_OK(file->Write(data2.data(), data2.size()));
+ ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read));
+ ASSERT_EQ(bytes_read, 4);
+ ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
+
+ ASSERT_OK(file->Close());
+ ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read));
+ ASSERT_EQ(bytes_read, 1);
+ ASSERT_EQ(0, std::memcmp(buffer, "!", 1));
+ // EOF reached
+ ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read));
+ ASSERT_EQ(bytes_read, 0);
+}
+
+// ----------------------------------------------------------------------
// Memory map tests
class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 58f82b3..a046228 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -20,6 +20,8 @@
#include <algorithm>
#include <cstdint>
+#include <cstdlib>
+#include <iostream>
#include <limits>
#include <memory>
#include <random>
@@ -70,7 +72,8 @@
do { \
::arrow::Status _s = (s); \
if (ARROW_PREDICT_FALSE(!_s.ok())) { \
- exit(EXIT_FAILURE); \
+ std::cerr << s.ToString() << "\n"; \
+ std::abort(); \
} \
} while (false);
diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
new file mode 100644
index 0000000..10a30df
--- /dev/null
+++ b/cpp/src/arrow/util/io-util.cc
@@ -0,0 +1,418 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Ensure 64-bit off_t for platforms where it matters
+#ifdef _FILE_OFFSET_BITS
+#undef _FILE_OFFSET_BITS
+#endif
+
+#define _FILE_OFFSET_BITS 64
+
+#include "arrow/io/windows_compatibility.h"
+
+#include <algorithm>
+#include <cerrno>
+#include <sstream>
+
+#include <fcntl.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h> // IWYU pragma: keep
+
+// Defines that don't exist in MinGW
+#if defined(__MINGW32__)
+#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
+#elif defined(_MSC_VER) // Visual Studio
+
+#else // gcc / clang on POSIX platforms
+#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
+#endif
+
+// For filename conversion
+#if defined(_MSC_VER)
+#include <boost/system/system_error.hpp> // NOLINT
+#include <codecvt>
+#include <locale>
+#endif
+
+// ----------------------------------------------------------------------
+// file compatibility stuff
+
+#if defined(__MINGW32__) // MinGW
+// nothing
+#elif defined(_MSC_VER) // Visual Studio
+#include <io.h>
+#else // POSIX / Linux
+// nothing
+#endif
+
+#ifndef _MSC_VER // POSIX-like platforms
+#include <unistd.h>
+#endif // _MSC_VER
+
+// POSIX systems do not have this
+#ifndef O_BINARY
+#define O_BINARY 0
+#endif
+
+// define max read/write count
+#if defined(_MSC_VER)
+#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
+#else
+
+#ifdef __APPLE__
+// due to macOS bug, we need to set read/write max
+#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
+#else
+// see notes on Linux read/write manpage
+#define ARROW_MAX_IO_CHUNKSIZE 0x7ffff000
+#endif
+
+#endif
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+namespace internal {
+
+#define CHECK_LSEEK(retval) \
+ if ((retval) == -1) return Status::IOError("lseek failed");
+
+static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
+#if defined(_MSC_VER)
+ return _lseeki64(fd, pos, whence);
+#else
+ return lseek(fd, pos, whence);
+#endif
+}
+
+static inline Status CheckFileOpResult(int ret, int errno_actual,
+ const PlatformFilename& file_name,
+ const char* opname) {
+ if (ret == -1) {
+ std::stringstream ss;
+ ss << "Failed to " << opname << " file: " << file_name.string();
+ ss << " , error: " << std::strerror(errno_actual);
+ return Status::IOError(ss.str());
+ }
+ return Status::OK();
+}
+
+//
+// File name handling
+//
+
+Status FileNameFromString(const std::string& file_name, PlatformFilename* out) {
+#if defined(_MSC_VER)
+ try {
+ std::codecvt_utf8_utf16<wchar_t> utf16_converter;
+ out->assign(file_name, utf16_converter);
+ } catch (boost::system::system_error& e) {
+ return Status::Invalid(e.what());
+ }
+#else
+ *out = internal::PlatformFilename(file_name);
+#endif
+ return Status::OK();
+}
+
+//
+// Functions for creating file descriptors
+//
+
+Status FileOpenReadable(const PlatformFilename& file_name, int* fd) {
+ int ret, errno_actual;
+#if defined(_MSC_VER)
+ errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), _O_RDONLY | _O_BINARY,
+ _SH_DENYNO, _S_IREAD);
+ ret = *fd;
+#else
+ ret = *fd = open(file_name.c_str(), O_RDONLY | O_BINARY);
+ errno_actual = errno;
+#endif
+
+ return CheckFileOpResult(ret, errno_actual, file_name, "open local");
+}
+
+Status FileOpenWriteable(const PlatformFilename& file_name, bool write_only,
+ bool truncate, int* fd) {
+ int ret, errno_actual;
+
+#if defined(_MSC_VER)
+ int oflag = _O_CREAT | _O_BINARY;
+ int pmode = _S_IWRITE;
+ if (!write_only) {
+ pmode |= _S_IREAD;
+ }
+
+ if (truncate) {
+ oflag |= _O_TRUNC;
+ }
+
+ if (write_only) {
+ oflag |= _O_WRONLY;
+ } else {
+ oflag |= _O_RDWR;
+ }
+
+ errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), oflag, _SH_DENYNO, pmode);
+ ret = *fd;
+
+#else
+ int oflag = O_CREAT | O_BINARY;
+
+ if (truncate) {
+ oflag |= O_TRUNC;
+ }
+
+ if (write_only) {
+ oflag |= O_WRONLY;
+ } else {
+ oflag |= O_RDWR;
+ }
+
+ ret = *fd = open(file_name.c_str(), oflag, ARROW_WRITE_SHMODE);
+ errno_actual = errno;
+#endif
+ return CheckFileOpResult(ret, errno_actual, file_name, "open local");
+}
+
+Status FileTell(int fd, int64_t* pos) {
+ int64_t current_pos;
+
+#if defined(_MSC_VER)
+ current_pos = _telli64(fd);
+ if (current_pos == -1) {
+ return Status::IOError("_telli64 failed");
+ }
+#else
+ current_pos = lseek64_compat(fd, 0, SEEK_CUR);
+ CHECK_LSEEK(current_pos);
+#endif
+
+ *pos = current_pos;
+ return Status::OK();
+}
+
+Status CreatePipe(int fd[2]) {
+ int ret;
+#if defined(_MSC_VER)
+ ret = _pipe(fd, 4096, _O_BINARY);
+#else
+ ret = pipe(fd);
+#endif
+
+ if (ret == -1) {
+ return Status::IOError(std::string("Error creating pipe: ") +
+ std::string(strerror(errno)));
+ }
+ return Status::OK();
+}
+
+//
+// Closing files
+//
+
+Status FileClose(int fd) {
+ int ret;
+
+#if defined(_MSC_VER)
+ ret = static_cast<int>(_close(fd));
+#else
+ ret = static_cast<int>(close(fd));
+#endif
+
+ if (ret == -1) {
+ return Status::IOError("error closing file");
+ }
+ return Status::OK();
+}
+
+//
+// Seeking and telling
+//
+
+Status FileSeek(int fd, int64_t pos, int whence) {
+ int64_t ret = lseek64_compat(fd, pos, whence);
+ CHECK_LSEEK(ret);
+ return Status::OK();
+}
+
+Status FileSeek(int fd, int64_t pos) { return FileSeek(fd, pos, SEEK_SET); }
+
+Status FileGetSize(int fd, int64_t* size) {
+ int64_t ret;
+
+ // XXX Should use fstat() instead, but this function also ensures the
+ // file is seekable
+
+ // Save current position
+ int64_t current_position = lseek64_compat(fd, 0, SEEK_CUR);
+ CHECK_LSEEK(current_position);
+
+ // Move to end of the file, which returns the file length
+ ret = lseek64_compat(fd, 0, SEEK_END);
+ CHECK_LSEEK(ret);
+
+ *size = ret;
+
+ // Restore file position
+ ret = lseek64_compat(fd, current_position, SEEK_SET);
+ CHECK_LSEEK(ret);
+
+ return Status::OK();
+}
+
+//
+// Reading data
+//
+
+static inline int64_t pread_compat(int fd, void* buf, int64_t nbytes, int64_t pos) {
+#if defined(_MSC_VER)
+ HANDLE handle = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+ DWORD dwBytesRead = 0;
+ OVERLAPPED overlapped = {0};
+ overlapped.Offset = static_cast<uint32_t>(pos);
+ overlapped.OffsetHigh = static_cast<uint32_t>(pos >> 32);
+
+ // Note: ReadFile() will update the file position
+ BOOL bRet =
+ ReadFile(handle, buf, static_cast<uint32_t>(nbytes), &dwBytesRead, &overlapped);
+ if (bRet || GetLastError() == ERROR_HANDLE_EOF) {
+ return dwBytesRead;
+ } else {
+ return -1;
+ }
+#else
+ return static_cast<int64_t>(
+ pread(fd, buf, static_cast<size_t>(nbytes), static_cast<off_t>(pos)));
+#endif
+}
+
+Status FileRead(int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
+ *bytes_read = 0;
+
+ while (*bytes_read < nbytes) {
+ int64_t chunksize =
+ std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - *bytes_read);
+#if defined(_MSC_VER)
+ int64_t ret =
+ static_cast<int64_t>(_read(fd, buffer, static_cast<uint32_t>(chunksize)));
+#else
+ int64_t ret = static_cast<int64_t>(read(fd, buffer, static_cast<size_t>(chunksize)));
+#endif
+
+ if (ret == -1) {
+ *bytes_read = ret;
+ break;
+ }
+ if (ret == 0) {
+ // EOF
+ break;
+ }
+ buffer += ret;
+ *bytes_read += ret;
+ }
+
+ if (*bytes_read == -1) {
+ return Status::IOError(std::string("Error reading bytes from file: ") +
+ std::string(strerror(errno)));
+ }
+
+ return Status::OK();
+}
+
+Status FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes,
+ int64_t* bytes_read) {
+ *bytes_read = 0;
+
+ while (*bytes_read < nbytes) {
+ int64_t chunksize =
+ std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - *bytes_read);
+ int64_t ret = pread_compat(fd, buffer, chunksize, position);
+
+ if (ret == -1) {
+ *bytes_read = ret;
+ break;
+ }
+ if (ret == 0) {
+ // EOF
+ break;
+ }
+ buffer += ret;
+ position += ret;
+ *bytes_read += ret;
+ }
+
+ if (*bytes_read == -1) {
+ return Status::IOError(std::string("Error reading bytes from file: ") +
+ std::string(strerror(errno)));
+ }
+ return Status::OK();
+}
+
+//
+// Writing data
+//
+
+Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) {
+ int ret = 0;
+ int64_t bytes_written = 0;
+
+ while (ret != -1 && bytes_written < nbytes) {
+ int64_t chunksize =
+ std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - bytes_written);
+#if defined(_MSC_VER)
+ ret = static_cast<int>(
+ _write(fd, buffer + bytes_written, static_cast<uint32_t>(chunksize)));
+#else
+ ret = static_cast<int>(
+ write(fd, buffer + bytes_written, static_cast<size_t>(chunksize)));
+#endif
+
+ if (ret != -1) {
+ bytes_written += ret;
+ }
+ }
+
+ if (ret == -1) {
+ return Status::IOError(std::string("Error writing bytes from file: ") +
+ std::string(strerror(errno)));
+ }
+ return Status::OK();
+}
+
+Status FileTruncate(int fd, const int64_t size) {
+ int ret, errno_actual;
+
+#ifdef _MSC_VER
+ errno_actual = _chsize_s(fd, static_cast<size_t>(size));
+ ret = errno_actual == 0 ? 0 : -1;
+#else
+ ret = ftruncate(fd, static_cast<size_t>(size));
+ errno_actual = errno;
+#endif
+
+ if (ret == -1) {
+ return Status::IOError(std::string("Error truncating file: ") +
+ std::string(strerror(errno_actual)));
+ }
+ return Status::OK();
+}
+
+} // namespace internal
+} // namespace arrow
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index 2a01be1..e857490 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -20,11 +20,16 @@
#include <iostream>
#include <memory>
+#include <string>
#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
+#if defined(_MSC_VER)
+#include <boost/filesystem.hpp> // NOLINT
+#endif
+
namespace arrow {
namespace io {
@@ -113,6 +118,52 @@ class StdinStream : public InputStream {
};
} // namespace io
+
+namespace internal {
+
+#if defined(_MSC_VER)
+// namespace fs = boost::filesystem;
+// #define PlatformFilename fs::path
+typedef ::boost::filesystem::path PlatformFilename;
+
+#else
+
+struct PlatformFilename {
+ PlatformFilename() {}
+ explicit PlatformFilename(const std::string& path) { utf8_path = path; }
+
+ const char* c_str() const { return utf8_path.c_str(); }
+
+ const std::string& string() const { return utf8_path; }
+
+ size_t length() const { return utf8_path.size(); }
+
+ std::string utf8_path;
+};
+#endif
+
+Status FileNameFromString(const std::string& file_name, PlatformFilename* out);
+
+Status FileOpenReadable(const PlatformFilename& file_name, int* fd);
+Status FileOpenWriteable(const PlatformFilename& file_name, bool write_only,
+ bool truncate, int* fd);
+
+Status FileRead(int fd, uint8_t* buffer, const int64_t nbytes, int64_t* bytes_read);
+Status FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes,
+ int64_t* bytes_read);
+Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes);
+Status FileTruncate(int fd, const int64_t size);
+
+Status FileTell(int fd, int64_t* pos);
+Status FileSeek(int fd, int64_t pos);
+Status FileSeek(int fd, int64_t pos, int whence);
+Status FileGetSize(int fd, int64_t* size);
+
+Status FileClose(int fd);
+
+Status CreatePipe(int fd[2]);
+
+} // namespace internal
} // namespace arrow
#endif // ARROW_UTIL_IO_UTIL_H
--
To stop receiving notification emails like this one, please contact
uwe@apache.org.