You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/04/21 14:05:12 UTC

[arrow] branch master updated: ARROW-1018: [C++] Create FileOutputStream, ReadableFile from file descriptor

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 54df19d  ARROW-1018: [C++] Create FileOutputStream, ReadableFile from file descriptor
54df19d is described below

commit 54df19db82eca92ffddbdcd97aab723d5c393f5f
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Sat Apr 21 15:05:08 2018 +0100

    ARROW-1018: [C++] Create FileOutputStream, ReadableFile from file descriptor
    
    Also:
    - move platform-specific code to `util/file-util.h`
    - backport the `FileReadAt` helper from PR #1867 (to minimize conflicts)
    - add a pipe-writing benchmark for more realistic numbers
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #1909 from pitrou/ARROW-1018-open-fd and squashes the following commits:
    
    0153b150 <Antoine Pitrou> Add pipe-writing benchmark
    57ea1657 <Antoine Pitrou> ARROW-1018:  Allow creating file objects from a file descriptor
---
 .travis.yml                           |   2 +-
 cpp/src/arrow/CMakeLists.txt          |   3 +-
 cpp/src/arrow/io/file.cc              | 414 ++++++---------------------------
 cpp/src/arrow/io/file.h               |  37 +++
 cpp/src/arrow/io/io-file-benchmark.cc | 181 +++++++++++++--
 cpp/src/arrow/io/io-file-test.cc      | 160 +++++++++++--
 cpp/src/arrow/test-util.h             |   5 +-
 cpp/src/arrow/util/io-util.cc         | 418 ++++++++++++++++++++++++++++++++++
 cpp/src/arrow/util/io-util.h          |  51 +++++
 9 files changed, 893 insertions(+), 378 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 2fd31be..59ad4bc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -87,7 +87,7 @@ matrix:
     before_script:
     - if [ $ARROW_CI_PYTHON_AFFECTED != "1" ]; then exit; fi
     # If either C++ or Python changed, we must install the C++ libraries
-    - travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
+    - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
     script:
     - if [ $ARROW_CI_CPP_AFFECTED == "1" ]; then $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh; fi
     - $TRAVIS_BUILD_DIR/ci/travis_build_parquet_cpp.sh
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index da7e24d..a3997c7 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -40,11 +40,12 @@ set(ARROW_SRCS
   util/cpu-info.cc
   util/decimal.cc
   util/hash.cc
+  util/io-util.cc
   util/key_value_metadata.cc
 )
 
 if ("${COMPILER_FAMILY}" STREQUAL "clang")
-  set_property(SOURCE io/file.cc
+  set_property(SOURCE util/io-util.cc
     APPEND_STRING
     PROPERTY COMPILE_FLAGS
     " -Wno-unused-macros ")
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 008f2b2..ba012be 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -15,37 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Ensure 64-bit off_t for platforms where it matters
-#ifdef _FILE_OFFSET_BITS
-#undef _FILE_OFFSET_BITS
-#endif
-
-#define _FILE_OFFSET_BITS 64
-
-// define max read/write count
-#if defined(_MSC_VER)
-#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
-#else
-
-#ifdef __APPLE__
-// due to macOS bug, we need to set read/write max
-#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
-#else
-// see notes on Linux read/write manpage
-#define ARROW_MAX_IO_CHUNKSIZE 0x7ffff000
-#endif
-
-#endif
-
-#include "arrow/io/file.h"
-
-#if _WIN32 || _WIN64
-#if _WIN64
-#define ENVIRONMENT64
-#else
-#define ENVIRONMENT32
-#endif
-#endif
+#include "arrow/io/windows_compatibility.h"
 
 // sys/mman.h not present in Visual Studio or Cygwin
 #ifdef _WIN32
@@ -55,334 +25,88 @@
 #include "arrow/io/mman.h"
 #undef Realloc
 #undef Free
-#include <windows.h>
 #else
 #include <sys/mman.h>
 #endif
 
+#include <string.h>
+
 #include <algorithm>
 #include <cerrno>
 #include <cstdint>
 #include <cstring>
 #include <mutex>
-#include <sstream>  // IWYU pragma: keep
-
-#if defined(_MSC_VER)
-#include <codecvt>
-#include <locale>
-#endif
-
-#include <fcntl.h>
-#include <sys/stat.h>
-#include <sys/types.h>  // IWYU pragma: keep
-
-#ifndef _MSC_VER  // POSIX-like platforms
-
-#include <unistd.h>
-
-// Not available on some platforms
-#ifndef errno_t
-#define errno_t int
-#endif
-
-#endif  // _MSC_VER
-
-// defines that don't exist in MinGW
-#if defined(__MINGW32__)
-#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
-#elif defined(_MSC_VER)  // Visual Studio
-
-#else  // gcc / clang on POSIX platforms
-#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
-#endif
-
-// ----------------------------------------------------------------------
-// file compatibility stuff
-
-#if defined(__MINGW32__)  // MinGW
-// nothing
-#elif defined(_MSC_VER)  // Visual Studio
-#include <io.h>
-#else  // POSIX / Linux
-// nothing
-#endif
-
-// POSIX systems do not have this
-#ifndef O_BINARY
-#define O_BINARY 0
-#endif
+#include <sstream>
 
 // ----------------------------------------------------------------------
 // Other Arrow includes
 
+#include "arrow/io/file.h"
 #include "arrow/io/interfaces.h"
 
 #include "arrow/buffer.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
+#include "arrow/util/io-util.h"
 #include "arrow/util/logging.h"
 
-#if defined(_MSC_VER)
-#include <boost/filesystem.hpp>           // NOLINT
-#include <boost/system/system_error.hpp>  // NOLINT
-namespace fs = boost::filesystem;
-#define PlatformFilename fs::path
-
-namespace arrow {
-namespace io {
-
-#else
 namespace arrow {
 namespace io {
 
-struct PlatformFilename {
-  PlatformFilename() {}
-  explicit PlatformFilename(const std::string& path) { utf8_path = path; }
-
-  const char* c_str() const { return utf8_path.c_str(); }
-
-  const std::string& string() const { return utf8_path; }
-
-  size_t length() const { return utf8_path.size(); }
-
-  std::string utf8_path;
-};
-#endif
-
-static inline Status CheckFileOpResult(int ret, int errno_actual,
-                                       const PlatformFilename& file_name,
-                                       const std::string& opname) {
-  if (ret == -1) {
-    std::stringstream ss;
-    ss << "Failed to " << opname << " file: " << file_name.string();
-    ss << " , error: " << std::strerror(errno_actual);
-    return Status::IOError(ss.str());
-  }
-  return Status::OK();
-}
-
-#define CHECK_LSEEK(retval) \
-  if ((retval) == -1) return Status::IOError("lseek failed");
-
-static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
-#if defined(_MSC_VER)
-  return _lseeki64(fd, pos, whence);
-#else
-  return lseek(fd, pos, whence);
-#endif
-}
-
-static inline Status FileOpenReadable(const PlatformFilename& file_name, int* fd) {
-  int ret;
-  errno_t errno_actual = 0;
-#if defined(_MSC_VER)
-  errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), _O_RDONLY | _O_BINARY,
-                           _SH_DENYNO, _S_IREAD);
-  ret = *fd;
-#else
-  ret = *fd = open(file_name.c_str(), O_RDONLY | O_BINARY);
-  errno_actual = errno;
-#endif
-
-  return CheckFileOpResult(ret, errno_actual, file_name, "open local");
-}
-
-static inline Status FileOpenWriteable(const PlatformFilename& file_name, bool write_only,
-                                       bool truncate, int* fd) {
-  int ret;
-  errno_t errno_actual = 0;
-
-#if defined(_MSC_VER)
-  int oflag = _O_CREAT | _O_BINARY;
-  int pmode = _S_IWRITE;
-  if (!write_only) {
-    pmode |= _S_IREAD;
-  }
-
-  if (truncate) {
-    oflag |= _O_TRUNC;
-  }
-
-  if (write_only) {
-    oflag |= _O_WRONLY;
-  } else {
-    oflag |= _O_RDWR;
-  }
-
-  errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), oflag, _SH_DENYNO, pmode);
-  ret = *fd;
-
-#else
-  int oflag = O_CREAT | O_BINARY;
-
-  if (truncate) {
-    oflag |= O_TRUNC;
-  }
-
-  if (write_only) {
-    oflag |= O_WRONLY;
-  } else {
-    oflag |= O_RDWR;
-  }
-
-  ret = *fd = open(file_name.c_str(), oflag, ARROW_WRITE_SHMODE);
-#endif
-  return CheckFileOpResult(ret, errno_actual, file_name, "open local");
-}
-
-static inline Status FileTell(int fd, int64_t* pos) {
-  int64_t current_pos;
-
-#if defined(_MSC_VER)
-  current_pos = _telli64(fd);
-  if (current_pos == -1) {
-    return Status::IOError("_telli64 failed");
-  }
-#else
-  current_pos = lseek64_compat(fd, 0, SEEK_CUR);
-  CHECK_LSEEK(current_pos);
-#endif
-
-  *pos = current_pos;
-  return Status::OK();
-}
-
-static inline Status FileSeek(int fd, int64_t pos) {
-  int64_t ret = lseek64_compat(fd, pos, SEEK_SET);
-  CHECK_LSEEK(ret);
-  return Status::OK();
-}
-
-static inline Status FileRead(const int fd, uint8_t* buffer, const int64_t nbytes,
-                              int64_t* bytes_read) {
-  *bytes_read = 0;
-
-  while (*bytes_read != -1 && *bytes_read < nbytes) {
-    int64_t chunksize =
-        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - *bytes_read);
-#if defined(_MSC_VER)
-    int64_t ret = static_cast<int64_t>(
-        _read(fd, buffer + *bytes_read, static_cast<uint32_t>(chunksize)));
-#else
-    int64_t ret = static_cast<int64_t>(
-        read(fd, buffer + *bytes_read, static_cast<size_t>(chunksize)));
-#endif
-
-    if (ret != -1) {
-      *bytes_read += ret;
-      if (ret < chunksize) {
-        // EOF
-        break;
-      }
-    } else {
-      *bytes_read = ret;
-    }
-  }
-
-  if (*bytes_read == -1) {
-    return Status::IOError(std::string("Error reading bytes from file: ") +
-                           std::string(strerror(errno)));
-  }
-
-  return Status::OK();
-}
-
-static inline Status FileWrite(const int fd, const uint8_t* buffer,
-                               const int64_t nbytes) {
-  int ret = 0;
-  int64_t bytes_written = 0;
-
-  while (ret != -1 && bytes_written < nbytes) {
-    int64_t chunksize =
-        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - bytes_written);
-#if defined(_MSC_VER)
-    ret = static_cast<int>(
-        _write(fd, buffer + bytes_written, static_cast<uint32_t>(chunksize)));
-#else
-    ret = static_cast<int>(
-        write(fd, buffer + bytes_written, static_cast<size_t>(chunksize)));
-#endif
-
-    if (ret != -1) {
-      bytes_written += ret;
-    }
-  }
-
-  if (ret == -1) {
-    return Status::IOError(std::string("Error writing bytes from file: ") +
-                           std::string(strerror(errno)));
-  }
-  return Status::OK();
-}
-
-static inline Status FileGetSize(int fd, int64_t* size) {
-  int64_t ret;
-
-  // Save current position
-  int64_t current_position = lseek64_compat(fd, 0, SEEK_CUR);
-  CHECK_LSEEK(current_position);
-
-  // move to end of the file
-  ret = lseek64_compat(fd, 0, SEEK_END);
-  CHECK_LSEEK(ret);
-
-  // Get file length
-  ret = lseek64_compat(fd, 0, SEEK_CUR);
-  CHECK_LSEEK(ret);
-
-  *size = ret;
-
-  // Restore file position
-  ret = lseek64_compat(fd, current_position, SEEK_SET);
-  CHECK_LSEEK(ret);
-
-  return Status::OK();
-}
-
-static inline Status FileClose(int fd) {
-  int ret;
-
-#if defined(_MSC_VER)
-  ret = static_cast<int>(_close(fd));
-#else
-  ret = static_cast<int>(close(fd));
-#endif
-
-  if (ret == -1) {
-    return Status::IOError("error closing file");
-  }
-  return Status::OK();
-}
-
 class OSFile {
  public:
   OSFile() : fd_(-1), is_open_(false), size_(-1) {}
 
   ~OSFile() {}
 
+  // Note: only one of the Open* methods below may be called on a given instance
+
   Status OpenWriteable(const std::string& path, bool append, bool write_only) {
     RETURN_NOT_OK(SetFileName(path));
 
-    RETURN_NOT_OK(FileOpenWriteable(file_name_, write_only, !append, &fd_));
+    RETURN_NOT_OK(internal::FileOpenWriteable(file_name_, write_only, !append, &fd_));
     is_open_ = true;
     mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
 
     if (append) {
-      RETURN_NOT_OK(FileGetSize(fd_, &size_));
+      RETURN_NOT_OK(internal::FileGetSize(fd_, &size_));
     } else {
       size_ = 0;
     }
     return Status::OK();
   }
 
+  // This is different from OpenWriteable(string, ...) in that it doesn't
+  // truncate nor mandate a seekable file
+  Status OpenWriteable(int fd) {
+    if (!internal::FileGetSize(fd, &size_).ok()) {
+      // Non-seekable file
+      size_ = -1;
+    }
+    RETURN_NOT_OK(SetFileName(fd));
+    is_open_ = true;
+    mode_ = FileMode::WRITE;
+    fd_ = fd;
+    return Status::OK();
+  }
+
   Status OpenReadable(const std::string& path) {
     RETURN_NOT_OK(SetFileName(path));
 
-    RETURN_NOT_OK(FileOpenReadable(file_name_, &fd_));
-    RETURN_NOT_OK(FileGetSize(fd_, &size_));
+    RETURN_NOT_OK(internal::FileOpenReadable(file_name_, &fd_));
+    RETURN_NOT_OK(internal::FileGetSize(fd_, &size_));
+
+    is_open_ = true;
+    mode_ = FileMode::READ;
+    return Status::OK();
+  }
 
+  Status OpenReadable(int fd) {
+    RETURN_NOT_OK(internal::FileGetSize(fd, &size_));
+    RETURN_NOT_OK(SetFileName(fd));
     is_open_ = true;
     mode_ = FileMode::READ;
+    fd_ = fd;
     return Status::OK();
   }
 
@@ -391,13 +115,13 @@ class OSFile {
       // Even if closing fails, the fd will likely be closed (perhaps it's
       // already closed).
       is_open_ = false;
-      RETURN_NOT_OK(FileClose(fd_));
+      RETURN_NOT_OK(internal::FileClose(fd_));
     }
     return Status::OK();
   }
 
   Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
-    return FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
+    return internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
   }
 
   Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
@@ -410,17 +134,17 @@ class OSFile {
     if (pos < 0) {
       return Status::Invalid("Invalid position");
     }
-    return FileSeek(fd_, pos);
+    return internal::FileSeek(fd_, pos);
   }
 
-  Status Tell(int64_t* pos) const { return FileTell(fd_, pos); }
+  Status Tell(int64_t* pos) const { return internal::FileTell(fd_, pos); }
 
   Status Write(const void* data, int64_t length) {
     std::lock_guard<std::mutex> guard(lock_);
     if (length < 0) {
       return Status::IOError("Length must be non-negative");
     }
-    return FileWrite(fd_, reinterpret_cast<const uint8_t*>(data), length);
+    return internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data), length);
   }
 
   int fd() const { return fd_; }
@@ -435,20 +159,15 @@ class OSFile {
 
  protected:
   Status SetFileName(const std::string& file_name) {
-#if defined(_MSC_VER)
-    try {
-      std::codecvt_utf8_utf16<wchar_t> utf16_converter;
-      file_name_.assign(file_name, utf16_converter);
-    } catch (boost::system::system_error& e) {
-      return Status::Invalid(e.what());
-    }
-#else
-    file_name_ = PlatformFilename(file_name);
-#endif
-    return Status::OK();
+    return internal::FileNameFromString(file_name, &file_name_);
+  }
+  Status SetFileName(int fd) {
+    std::stringstream ss;
+    ss << "<fd " << fd << ">";
+    return SetFileName(ss.str());
   }
 
-  PlatformFilename file_name_;
+  internal::PlatformFilename file_name_;
 
   std::mutex lock_;
 
@@ -469,6 +188,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
   explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {}
 
   Status Open(const std::string& path) { return OpenReadable(path); }
+  Status Open(int fd) { return OpenReadable(fd); }
 
   Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) {
     std::shared_ptr<ResizableBuffer> buffer;
@@ -492,8 +212,7 @@ ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(
 ReadableFile::~ReadableFile() { DCHECK(impl_->Close().ok()); }
 
 Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile>* file) {
-  *file = std::shared_ptr<ReadableFile>(new ReadableFile(default_memory_pool()));
-  return (*file)->impl_->Open(path);
+  return Open(path, default_memory_pool(), file);
 }
 
 Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
@@ -502,6 +221,16 @@ Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
   return (*file)->impl_->Open(path);
 }
 
+Status ReadableFile::Open(int fd, MemoryPool* memory_pool,
+                          std::shared_ptr<ReadableFile>* file) {
+  *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool));
+  return (*file)->impl_->Open(fd);
+}
+
+Status ReadableFile::Open(int fd, std::shared_ptr<ReadableFile>* file) {
+  return Open(fd, default_memory_pool(), file);
+}
+
 Status ReadableFile::Close() { return impl_->Close(); }
 
 Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); }
@@ -545,8 +274,9 @@ int ReadableFile::file_descriptor() const { return impl_->fd(); }
 class FileOutputStream::FileOutputStreamImpl : public OSFile {
  public:
   Status Open(const std::string& path, bool append) {
-    return OpenWriteable(path, append, true);
+    return OpenWriteable(path, append, true /* write_only */);
   }
+  Status Open(int fd) { return OpenWriteable(fd); }
 };
 
 FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); }
@@ -567,6 +297,11 @@ Status FileOutputStream::Open(const std::string& path, bool append,
   return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(path, append);
 }
 
+Status FileOutputStream::Open(int fd, std::shared_ptr<OutputStream>* out) {
+  *out = std::shared_ptr<FileOutputStream>(new FileOutputStream());
+  return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(fd);
+}
+
 Status FileOutputStream::Open(const std::string& path,
                               std::shared_ptr<FileOutputStream>* file) {
   return Open(path, false, file);
@@ -579,6 +314,11 @@ Status FileOutputStream::Open(const std::string& path, bool append,
   return (*file)->impl_->Open(path, append);
 }
 
+Status FileOutputStream::Open(int fd, std::shared_ptr<FileOutputStream>* file) {
+  *file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
+  return (*file)->impl_->Open(fd);
+}
+
 Status FileOutputStream::Close() { return impl_->Close(); }
 
 Status FileOutputStream::Tell(int64_t* pos) const { return impl_->Tell(pos); }
@@ -682,20 +422,10 @@ MemoryMappedFile::~MemoryMappedFile() {}
 
 Status MemoryMappedFile::Create(const std::string& path, int64_t size,
                                 std::shared_ptr<MemoryMappedFile>* out) {
-  int ret;
-  errno_t errno_actual;
   std::shared_ptr<FileOutputStream> file;
   RETURN_NOT_OK(FileOutputStream::Open(path, &file));
 
-#ifdef _MSC_VER
-  errno_actual = _chsize_s(file->file_descriptor(), static_cast<size_t>(size));
-  ret = errno_actual == 0 ? 0 : -1;
-#else
-  ret = ftruncate(file->file_descriptor(), static_cast<size_t>(size));
-  errno_actual = errno;
-#endif
-
-  RETURN_NOT_OK(CheckFileOpResult(ret, errno_actual, PlatformFilename(path), "truncate"));
+  RETURN_NOT_OK(internal::FileTruncate(file->file_descriptor(), size));
 
   RETURN_NOT_OK(file->Close());
   return MemoryMappedFile::Open(path, FileMode::READWRITE, out);
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index a1f9edc..c2572da 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -54,6 +54,15 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
   static Status Open(const std::string& path, bool append,
                      std::shared_ptr<OutputStream>* out);
 
+  /// \brief Open a file descriptor for writing.  The underlying file isn't
+  /// truncated.
+  /// \param[in] fd file descriptor
+  /// \param[out] out a base interface OutputStream instance
+  ///
+  /// The file descriptor becomes owned by the OutputStream, and will be closed
+  /// on Close() or destruction.
+  static Status Open(int fd, std::shared_ptr<OutputStream>* out);
+
   /// \brief Open a local file for writing, truncating any existing file
   /// \param[in] path with UTF8 encoding
   /// \param[out] file a FileOutputStream instance
@@ -69,6 +78,15 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
   static Status Open(const std::string& path, bool append,
                      std::shared_ptr<FileOutputStream>* file);
 
+  /// \brief Open a file descriptor for writing.  The underlying file isn't
+  /// truncated.
+  /// \param[in] fd file descriptor
+  /// \param[out] out a FileOutputStream instance
+  ///
+  /// The file descriptor becomes owned by the OutputStream, and will be closed
+  /// on Close() or destruction.
+  static Status Open(int fd, std::shared_ptr<FileOutputStream>* out);
+
   // OutputStream interface
   Status Close() override;
   Status Tell(int64_t* position) const override;
@@ -104,6 +122,25 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile {
   static Status Open(const std::string& path, MemoryPool* pool,
                      std::shared_ptr<ReadableFile>* file);
 
+  /// \brief Open a local file for reading
+  /// \param[in] fd file descriptor
+  /// \param[out] file ReadableFile instance
+  /// Open file with one's own memory pool for memory allocations
+  ///
+  /// The file descriptor becomes owned by the ReadableFile, and will be closed
+  /// on Close() or destruction.
+  static Status Open(int fd, std::shared_ptr<ReadableFile>* file);
+
+  /// \brief Open a local file for reading
+  /// \param[in] fd file descriptor
+  /// \param[in] pool a MemoryPool for memory allocations
+  /// \param[out] file ReadableFile instance
+  /// Open file with one's own memory pool for memory allocations
+  ///
+  /// The file descriptor becomes owned by the ReadableFile, and will be closed
+  /// on Close() or destruction.
+  static Status Open(int fd, MemoryPool* pool, std::shared_ptr<ReadableFile>* file);
+
   Status Close() override;
   Status Tell(int64_t* position) const override;
 
diff --git a/cpp/src/arrow/io/io-file-benchmark.cc b/cpp/src/arrow/io/io-file-benchmark.cc
index bb35b08..e5a326e 100644
--- a/cpp/src/arrow/io/io-file-benchmark.cc
+++ b/cpp/src/arrow/io/io-file-benchmark.cc
@@ -19,25 +19,109 @@
 #include "arrow/io/buffered.h"
 #include "arrow/io/file.h"
 #include "arrow/test-util.h"
+#include "arrow/util/io-util.h"
 
 #include "benchmark/benchmark.h"
 
 #include <algorithm>
+#include <atomic>
+#include <cstdlib>
 #include <iostream>
+#include <thread>
 #include <valarray>
 
+#include <fcntl.h>
+#include <poll.h>
+#include <unistd.h>
+
 namespace arrow {
 
-// XXX Writing to /dev/null is irrealistic as the kernel likely doesn't
-// copy the data at all.  Use a socketpair instead?
 std::string GetNullFile() { return "/dev/null"; }
 
 const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
 const std::valarray<int64_t> large_sizes = {8192, 100000};
 
+class BackgroundReader {
+  // A class that reads data in the background from a file descriptor
+
+ public:
+  static std::shared_ptr<BackgroundReader> StartReader(int fd) {
+    std::shared_ptr<BackgroundReader> reader(new BackgroundReader(fd));
+    reader->worker_.reset(new std::thread([=] { reader->LoopReading(); }));
+    return reader;
+  }
+  void Stop() {
+    const uint8_t data[] = "x";
+    ABORT_NOT_OK(internal::FileWrite(wakeup_w_, data, 1));
+  }
+  void Join() { worker_->join(); }
+
+  ~BackgroundReader() {
+    for (int fd : {fd_, wakeup_r_, wakeup_w_}) {
+      ABORT_NOT_OK(internal::FileClose(fd));
+    }
+  }
+
+ protected:
+  explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
+    // Prepare self-pipe trick
+    int wakeupfd[2];
+    ABORT_NOT_OK(internal::CreatePipe(wakeupfd));
+    wakeup_r_ = wakeupfd[0];
+    wakeup_w_ = wakeupfd[1];
+    // Put fd in non-blocking mode
+    fcntl(fd, F_SETFL, O_NONBLOCK);
+  }
+
+  void LoopReading() {
+    struct pollfd pollfds[2];
+    pollfds[0].fd = fd_;
+    pollfds[0].events = POLLIN;
+    pollfds[1].fd = wakeup_r_;
+    pollfds[1].events = POLLIN;
+    while (true) {
+      int ret = poll(pollfds, 2, -1 /* timeout */);
+      if (ret < 1) {
+        std::cerr << "poll() failed with code " << ret << "\n";
+        abort();
+      }
+      if (pollfds[1].revents & POLLIN) {
+        // We're done
+        break;
+      }
+      if (!(pollfds[0].revents & POLLIN)) {
+        continue;
+      }
+      int64_t bytes_read;
+      // There could be a spurious wakeup followed by EAGAIN
+      ARROW_UNUSED(internal::FileRead(fd_, buffer_, buffer_size_, &bytes_read));
+      total_bytes_ += bytes_read;
+    }
+  }
+
+  int fd_, wakeup_r_, wakeup_w_;
+  int64_t total_bytes_;
+
+  static const int64_t buffer_size_ = 16384;
+  uint8_t buffer_[buffer_size_];
+
+  std::unique_ptr<std::thread> worker_;
+};
+
+// Set up a pipe with an OutputStream at one end and a BackgroundReader at
+// the other end.
+static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
+                            std::shared_ptr<BackgroundReader>* reader) {
+  int fd[2];
+  ABORT_NOT_OK(internal::CreatePipe(fd));
+  ABORT_NOT_OK(io::FileOutputStream::Open(fd[1], stream));
+  *reader = BackgroundReader::StartReader(fd[0]);
+}
+
 static void BenchmarkStreamingWrites(benchmark::State& state,
                                      std::valarray<int64_t> sizes,
-                                     io::OutputStream* stream) {
+                                     io::OutputStream* stream,
+                                     BackgroundReader* reader = nullptr) {
   const std::string datastr(*std::max_element(std::begin(sizes), std::end(sizes)), 'x');
   const void* data = datastr.data();
   const int64_t sum_sizes = sizes.sum();
@@ -47,10 +131,23 @@ static void BenchmarkStreamingWrites(benchmark::State& state,
       ABORT_NOT_OK(stream->Write(data, size));
     }
   }
-  state.SetBytesProcessed(int64_t(state.iterations()) * sum_sizes);
+  const int64_t total_bytes = static_cast<int64_t>(state.iterations()) * sum_sizes;
+  state.SetBytesProcessed(total_bytes);
+
+  if (reader != nullptr) {
+    // Wake up and stop
+    reader->Stop();
+    reader->Join();
+  }
+  ABORT_NOT_OK(stream->Close());
 }
 
-static void BM_FileOutputStreamSmallWrites(
+// Benchmark writing to /dev/null
+//
+// This situation is irrealistic as the kernel likely doesn't
+// copy the data at all, so we only measure small writes.
+
+static void BM_FileOutputStreamSmallWritesToNull(
     benchmark::State& state) {  // NOLINT non-const reference
   std::shared_ptr<io::OutputStream> stream;
   ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
@@ -58,38 +155,84 @@ static void BM_FileOutputStreamSmallWrites(
   BenchmarkStreamingWrites(state, small_sizes, stream.get());
 }
 
-static void BM_FileOutputStreamLargeWrites(
+static void BM_BufferedOutputStreamSmallWritesToNull(
     benchmark::State& state) {  // NOLINT non-const reference
   std::shared_ptr<io::OutputStream> stream;
   ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
+  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
 
-  BenchmarkStreamingWrites(state, large_sizes, stream.get());
+  BenchmarkStreamingWrites(state, small_sizes, stream.get());
 }
 
-static void BM_BufferedOutputStreamSmallWrites(
+// Benchmark writing a pipe
+//
+// This is slightly more realistic than the above
+
+static void BM_FileOutputStreamSmallWritesToPipe(
     benchmark::State& state) {  // NOLINT non-const reference
   std::shared_ptr<io::OutputStream> stream;
-  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
-  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+  std::shared_ptr<BackgroundReader> reader;
+  SetupPipeWriter(&stream, &reader);
 
-  BenchmarkStreamingWrites(state, small_sizes, stream.get());
+  BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
 }
 
-static void BM_BufferedOutputStreamLargeWrites(
+static void BM_FileOutputStreamLargeWritesToPipe(
     benchmark::State& state) {  // NOLINT non-const reference
   std::shared_ptr<io::OutputStream> stream;
-  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
-  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+  std::shared_ptr<BackgroundReader> reader;
+  SetupPipeWriter(&stream, &reader);
 
-  BenchmarkStreamingWrites(state, large_sizes, stream.get());
+  BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
 }
 
-BENCHMARK(BM_FileOutputStreamSmallWrites)->Repetitions(2)->MinTime(1.0);
+static void BM_BufferedOutputStreamSmallWritesToPipe(
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::shared_ptr<io::OutputStream> stream;
+  std::shared_ptr<BackgroundReader> reader;
+  SetupPipeWriter(&stream, &reader);
+  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+
+  BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
+}
 
-BENCHMARK(BM_FileOutputStreamLargeWrites)->Repetitions(2)->MinTime(1.0);
+static void BM_BufferedOutputStreamLargeWritesToPipe(
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::shared_ptr<io::OutputStream> stream;
+  std::shared_ptr<BackgroundReader> reader;
+  SetupPipeWriter(&stream, &reader);
+  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
 
-BENCHMARK(BM_BufferedOutputStreamSmallWrites)->Repetitions(2)->MinTime(1.0);
+  BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
+}
 
-BENCHMARK(BM_BufferedOutputStreamLargeWrites)->Repetitions(2)->MinTime(1.0);
+// We use real time as we don't want to count CPU time spent in the
+// BackgroundReader thread
+
+BENCHMARK(BM_FileOutputStreamSmallWritesToNull)
+    ->Repetitions(2)
+    ->MinTime(1.0)
+    ->UseRealTime();
+BENCHMARK(BM_FileOutputStreamSmallWritesToPipe)
+    ->Repetitions(2)
+    ->MinTime(1.0)
+    ->UseRealTime();
+BENCHMARK(BM_FileOutputStreamLargeWritesToPipe)
+    ->Repetitions(2)
+    ->MinTime(1.0)
+    ->UseRealTime();
+
+BENCHMARK(BM_BufferedOutputStreamSmallWritesToNull)
+    ->Repetitions(2)
+    ->MinTime(1.0)
+    ->UseRealTime();
+BENCHMARK(BM_BufferedOutputStreamSmallWritesToPipe)
+    ->Repetitions(2)
+    ->MinTime(1.0)
+    ->UseRealTime();
+BENCHMARK(BM_BufferedOutputStreamLargeWritesToPipe)
+    ->Repetitions(2)
+    ->MinTime(1.0)
+    ->UseRealTime();
 
 }  // namespace arrow
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 098e82f..d3ef908 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -36,6 +36,7 @@
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/test-util.h"
+#include "arrow/util/io-util.h"
 
 namespace arrow {
 namespace io {
@@ -43,7 +44,7 @@ namespace io {
 class FileTestFixture : public ::testing::Test {
  public:
   void SetUp() {
-    path_ = "arrow-test-io-file-output-stream.txt";
+    path_ = "arrow-test-io-file.txt";
     EnsureFileDeleted();
   }
 
@@ -68,6 +69,17 @@ class TestFileOutputStream : public FileTestFixture {
     ASSERT_OK(FileOutputStream::Open(path_, append, &file_));
     ASSERT_OK(FileOutputStream::Open(path_, append, &stream_));
   }
+  void OpenFileDescriptor() {
+    internal::PlatformFilename file_name;
+    ASSERT_OK(internal::FileNameFromString(path_, &file_name));
+    int fd_file, fd_stream;
+    ASSERT_OK(internal::FileOpenWriteable(file_name, true /* write_only */,
+                                          false /* truncate */, &fd_file));
+    ASSERT_OK(FileOutputStream::Open(fd_file, &file_));
+    ASSERT_OK(internal::FileOpenWriteable(file_name, true /* write_only */,
+                                          false /* truncate */, &fd_stream));
+    ASSERT_OK(FileOutputStream::Open(fd_stream, &stream_));
+  }
 
  protected:
   std::shared_ptr<FileOutputStream> file_;
@@ -90,19 +102,27 @@ TEST_F(TestFileOutputStream, FileNameWideCharConversionRangeException) {
 #endif
 
 TEST_F(TestFileOutputStream, DestructorClosesFile) {
-  int fd;
-  {
-    std::shared_ptr<FileOutputStream> file;
-    ASSERT_OK(FileOutputStream::Open(path_, &file));
-    fd = file->file_descriptor();
-  }
-  ASSERT_TRUE(FileIsClosed(fd));
-  {
-    std::shared_ptr<OutputStream> stream;
-    ASSERT_OK(FileOutputStream::Open(path_, &stream));
-    fd = std::static_pointer_cast<FileOutputStream>(stream)->file_descriptor();
-  }
-  ASSERT_TRUE(FileIsClosed(fd));
+  int fd_file, fd_stream;
+
+  OpenFile();
+  fd_file = file_->file_descriptor();
+  fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
+  ASSERT_FALSE(FileIsClosed(fd_file));
+  file_.reset();
+  ASSERT_TRUE(FileIsClosed(fd_file));
+  ASSERT_FALSE(FileIsClosed(fd_stream));
+  stream_.reset();
+  ASSERT_TRUE(FileIsClosed(fd_stream));
+
+  OpenFileDescriptor();
+  fd_file = file_->file_descriptor();
+  fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
+  ASSERT_FALSE(FileIsClosed(fd_file));
+  file_.reset();
+  ASSERT_TRUE(FileIsClosed(fd_file));
+  ASSERT_FALSE(FileIsClosed(fd_stream));
+  stream_.reset();
+  ASSERT_TRUE(FileIsClosed(fd_stream));
 }
 
 TEST_F(TestFileOutputStream, Close) {
@@ -132,6 +152,33 @@ TEST_F(TestFileOutputStream, Close) {
   AssertFileContents(path_, data);
 }
 
+TEST_F(TestFileOutputStream, FromFileDescriptor) {
+  OpenFileDescriptor();
+  stream_.reset();
+
+  std::string data1 = "test";
+  ASSERT_OK(file_->Write(data1.data(), data1.size()));
+  int fd = file_->file_descriptor();
+  ASSERT_OK(file_->Close());
+  ASSERT_TRUE(FileIsClosed(fd));
+
+  AssertFileContents(path_, data1);
+
+  // Re-open at end of file
+  internal::PlatformFilename file_name;
+  ASSERT_OK(internal::FileNameFromString(path_, &file_name));
+  ASSERT_OK(internal::FileOpenWriteable(file_name, true /* write_only */,
+                                        false /* truncate */, &fd));
+  ASSERT_OK(internal::FileSeek(fd, 0, SEEK_END));
+  ASSERT_OK(FileOutputStream::Open(fd, &stream_));
+
+  std::string data2 = "data";
+  ASSERT_OK(stream_->Write(data2.data(), data2.size()));
+  ASSERT_OK(stream_->Close());
+
+  AssertFileContents(path_, data1 + data2);
+}
+
 TEST_F(TestFileOutputStream, InvalidWrites) {
   OpenFile();
 
@@ -225,6 +272,32 @@ TEST_F(TestReadableFile, Close) {
 
   // Idempotent
   ASSERT_OK(file_->Close());
+  ASSERT_TRUE(FileIsClosed(fd));
+}
+
+TEST_F(TestReadableFile, FromFileDescriptor) {
+  MakeTestFile();
+
+  internal::PlatformFilename file_name;
+  int fd = -2;
+  ASSERT_OK(internal::FileNameFromString(path_, &file_name));
+  ASSERT_OK(internal::FileOpenReadable(file_name, &fd));
+  ASSERT_GE(fd, 0);
+  ASSERT_OK(internal::FileSeek(fd, 4));
+
+  ASSERT_OK(ReadableFile::Open(fd, &file_));
+  ASSERT_EQ(file_->file_descriptor(), fd);
+  std::shared_ptr<Buffer> buf;
+  ASSERT_OK(file_->Read(5, &buf));
+  ASSERT_EQ(buf->size(), 4);
+  ASSERT_TRUE(buf->Equals(Buffer("data")));
+
+  ASSERT_FALSE(FileIsClosed(fd));
+  ASSERT_OK(file_->Close());
+  ASSERT_TRUE(FileIsClosed(fd));
+  // Idempotent
+  ASSERT_OK(file_->Close());
+  ASSERT_TRUE(FileIsClosed(fd));
 }
 
 TEST_F(TestReadableFile, SeekTellSize) {
@@ -407,6 +480,65 @@ TEST_F(TestReadableFile, ThreadSafety) {
 }
 
 // ----------------------------------------------------------------------
+// Pipe I/O tests using FileOutputStream
+// (cannot test using ReadableFile as it currently requires seeking)
+
+class TestPipeIO : public ::testing::Test {
+ public:
+  void MakePipe() {
+    int fd[2];
+    ASSERT_OK(internal::CreatePipe(fd));
+    r_ = fd[0];
+    w_ = fd[1];
+    ASSERT_GE(r_, 0);
+    ASSERT_GE(w_, 0);
+  }
+  void ClosePipe() {
+    if (r_ != -1) {
+      ASSERT_OK(internal::FileClose(r_));
+      r_ = -1;
+    }
+    if (w_ != -1) {
+      ASSERT_OK(internal::FileClose(w_));
+      w_ = -1;
+    }
+  }
+  void TearDown() { ClosePipe(); }
+
+ protected:
+  int r_ = -1, w_ = -1;
+};
+
+TEST_F(TestPipeIO, TestWrite) {
+  std::string data1 = "test", data2 = "data!";
+  std::shared_ptr<FileOutputStream> file;
+  uint8_t buffer[10];
+  int64_t bytes_read;
+
+  MakePipe();
+  ASSERT_OK(FileOutputStream::Open(w_, &file));
+  w_ = -1;  // now owned by FileOutputStream
+
+  ASSERT_OK(file->Write(data1.data(), data1.size()));
+  ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read));
+  ASSERT_EQ(bytes_read, 4);
+  ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
+
+  ASSERT_OK(file->Write(data2.data(), data2.size()));
+  ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read));
+  ASSERT_EQ(bytes_read, 4);
+  ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
+
+  ASSERT_OK(file->Close());
+  ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read));
+  ASSERT_EQ(bytes_read, 1);
+  ASSERT_EQ(0, std::memcmp(buffer, "!", 1));
+  // EOF reached
+  ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read));
+  ASSERT_EQ(bytes_read, 0);
+}
+
+// ----------------------------------------------------------------------
 // Memory map tests
 
 class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 58f82b3..a046228 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -20,6 +20,8 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <cstdlib>
+#include <iostream>
 #include <limits>
 #include <memory>
 #include <random>
@@ -70,7 +72,8 @@
   do {                                   \
     ::arrow::Status _s = (s);            \
     if (ARROW_PREDICT_FALSE(!_s.ok())) { \
-      exit(EXIT_FAILURE);                \
+      std::cerr << s.ToString() << "\n"; \
+      std::abort();                      \
     }                                    \
   } while (false);
 
diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
new file mode 100644
index 0000000..10a30df
--- /dev/null
+++ b/cpp/src/arrow/util/io-util.cc
@@ -0,0 +1,418 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Ensure 64-bit off_t for platforms where it matters
+#ifdef _FILE_OFFSET_BITS
+#undef _FILE_OFFSET_BITS
+#endif
+
+#define _FILE_OFFSET_BITS 64
+
+#include "arrow/io/windows_compatibility.h"
+
+#include <algorithm>
+#include <cerrno>
+#include <sstream>
+
+#include <fcntl.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>  // IWYU pragma: keep
+
+// Defines that don't exist in MinGW
+#if defined(__MINGW32__)
+#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
+#elif defined(_MSC_VER)  // Visual Studio
+
+#else  // gcc / clang on POSIX platforms
+#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
+#endif
+
+// For filename conversion
+#if defined(_MSC_VER)
+#include <boost/system/system_error.hpp>  // NOLINT
+#include <codecvt>
+#include <locale>
+#endif
+
+// ----------------------------------------------------------------------
+// file compatibility stuff
+
+#if defined(__MINGW32__)  // MinGW
+// nothing
+#elif defined(_MSC_VER)  // Visual Studio
+#include <io.h>
+#else  // POSIX / Linux
+// nothing
+#endif
+
+#ifndef _MSC_VER  // POSIX-like platforms
+#include <unistd.h>
+#endif  // _MSC_VER
+
+// POSIX systems do not have this
+#ifndef O_BINARY
+#define O_BINARY 0
+#endif
+
+// define max read/write count
+#if defined(_MSC_VER)
+#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
+#else
+
+#ifdef __APPLE__
+// due to macOS bug, we need to set read/write max
+#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
+#else
+// see notes on Linux read/write manpage
+#define ARROW_MAX_IO_CHUNKSIZE 0x7ffff000
+#endif
+
+#endif
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+namespace internal {
+
+#define CHECK_LSEEK(retval) \
+  if ((retval) == -1) return Status::IOError("lseek failed");
+
+static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
+#if defined(_MSC_VER)
+  return _lseeki64(fd, pos, whence);
+#else
+  return lseek(fd, pos, whence);
+#endif
+}
+
+static inline Status CheckFileOpResult(int ret, int errno_actual,
+                                       const PlatformFilename& file_name,
+                                       const char* opname) {
+  if (ret == -1) {
+    std::stringstream ss;
+    ss << "Failed to " << opname << " file: " << file_name.string();
+    ss << " , error: " << std::strerror(errno_actual);
+    return Status::IOError(ss.str());
+  }
+  return Status::OK();
+}
+
+//
+// File name handling
+//
+
+Status FileNameFromString(const std::string& file_name, PlatformFilename* out) {
+#if defined(_MSC_VER)
+  try {
+    std::codecvt_utf8_utf16<wchar_t> utf16_converter;
+    out->assign(file_name, utf16_converter);
+  } catch (boost::system::system_error& e) {
+    return Status::Invalid(e.what());
+  }
+#else
+  *out = internal::PlatformFilename(file_name);
+#endif
+  return Status::OK();
+}
+
+//
+// Functions for creating file descriptors
+//
+
+Status FileOpenReadable(const PlatformFilename& file_name, int* fd) {
+  int ret, errno_actual;
+#if defined(_MSC_VER)
+  errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), _O_RDONLY | _O_BINARY,
+                           _SH_DENYNO, _S_IREAD);
+  ret = *fd;
+#else
+  ret = *fd = open(file_name.c_str(), O_RDONLY | O_BINARY);
+  errno_actual = errno;
+#endif
+
+  return CheckFileOpResult(ret, errno_actual, file_name, "open local");
+}
+
+Status FileOpenWriteable(const PlatformFilename& file_name, bool write_only,
+                         bool truncate, int* fd) {
+  int ret, errno_actual;
+
+#if defined(_MSC_VER)
+  int oflag = _O_CREAT | _O_BINARY;
+  int pmode = _S_IWRITE;
+  if (!write_only) {
+    pmode |= _S_IREAD;
+  }
+
+  if (truncate) {
+    oflag |= _O_TRUNC;
+  }
+
+  if (write_only) {
+    oflag |= _O_WRONLY;
+  } else {
+    oflag |= _O_RDWR;
+  }
+
+  errno_actual = _wsopen_s(fd, file_name.wstring().c_str(), oflag, _SH_DENYNO, pmode);
+  ret = *fd;
+
+#else
+  int oflag = O_CREAT | O_BINARY;
+
+  if (truncate) {
+    oflag |= O_TRUNC;
+  }
+
+  if (write_only) {
+    oflag |= O_WRONLY;
+  } else {
+    oflag |= O_RDWR;
+  }
+
+  ret = *fd = open(file_name.c_str(), oflag, ARROW_WRITE_SHMODE);
+  errno_actual = errno;
+#endif
+  return CheckFileOpResult(ret, errno_actual, file_name, "open local");
+}
+
+Status FileTell(int fd, int64_t* pos) {
+  int64_t current_pos;
+
+#if defined(_MSC_VER)
+  current_pos = _telli64(fd);
+  if (current_pos == -1) {
+    return Status::IOError("_telli64 failed");
+  }
+#else
+  current_pos = lseek64_compat(fd, 0, SEEK_CUR);
+  CHECK_LSEEK(current_pos);
+#endif
+
+  *pos = current_pos;
+  return Status::OK();
+}
+
+Status CreatePipe(int fd[2]) {
+  int ret;
+#if defined(_MSC_VER)
+  ret = _pipe(fd, 4096, _O_BINARY);
+#else
+  ret = pipe(fd);
+#endif
+
+  if (ret == -1) {
+    return Status::IOError(std::string("Error creating pipe: ") +
+                           std::string(strerror(errno)));
+  }
+  return Status::OK();
+}
+
+//
+// Closing files
+//
+
+Status FileClose(int fd) {
+  int ret;
+
+#if defined(_MSC_VER)
+  ret = static_cast<int>(_close(fd));
+#else
+  ret = static_cast<int>(close(fd));
+#endif
+
+  if (ret == -1) {
+    return Status::IOError("error closing file");
+  }
+  return Status::OK();
+}
+
+//
+// Seeking and telling
+//
+
+Status FileSeek(int fd, int64_t pos, int whence) {
+  int64_t ret = lseek64_compat(fd, pos, whence);
+  CHECK_LSEEK(ret);
+  return Status::OK();
+}
+
+Status FileSeek(int fd, int64_t pos) { return FileSeek(fd, pos, SEEK_SET); }
+
+Status FileGetSize(int fd, int64_t* size) {
+  int64_t ret;
+
+  // XXX Should use fstat() instead, but this function also ensures the
+  // file is seekable
+
+  // Save current position
+  int64_t current_position = lseek64_compat(fd, 0, SEEK_CUR);
+  CHECK_LSEEK(current_position);
+
+  // Move to end of the file, which returns the file length
+  ret = lseek64_compat(fd, 0, SEEK_END);
+  CHECK_LSEEK(ret);
+
+  *size = ret;
+
+  // Restore file position
+  ret = lseek64_compat(fd, current_position, SEEK_SET);
+  CHECK_LSEEK(ret);
+
+  return Status::OK();
+}
+
+//
+// Reading data
+//
+
+static inline int64_t pread_compat(int fd, void* buf, int64_t nbytes, int64_t pos) {
+#if defined(_MSC_VER)
+  HANDLE handle = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+  DWORD dwBytesRead = 0;
+  OVERLAPPED overlapped = {0};
+  overlapped.Offset = static_cast<uint32_t>(pos);
+  overlapped.OffsetHigh = static_cast<uint32_t>(pos >> 32);
+
+  // Note: ReadFile() will update the file position
+  BOOL bRet =
+      ReadFile(handle, buf, static_cast<uint32_t>(nbytes), &dwBytesRead, &overlapped);
+  if (bRet || GetLastError() == ERROR_HANDLE_EOF) {
+    return dwBytesRead;
+  } else {
+    return -1;
+  }
+#else
+  return static_cast<int64_t>(
+      pread(fd, buf, static_cast<size_t>(nbytes), static_cast<off_t>(pos)));
+#endif
+}
+
+Status FileRead(int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
+  *bytes_read = 0;
+
+  while (*bytes_read < nbytes) {
+    int64_t chunksize =
+        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - *bytes_read);
+#if defined(_MSC_VER)
+    int64_t ret =
+        static_cast<int64_t>(_read(fd, buffer, static_cast<uint32_t>(chunksize)));
+#else
+    int64_t ret = static_cast<int64_t>(read(fd, buffer, static_cast<size_t>(chunksize)));
+#endif
+
+    if (ret == -1) {
+      *bytes_read = ret;
+      break;
+    }
+    if (ret == 0) {
+      // EOF
+      break;
+    }
+    buffer += ret;
+    *bytes_read += ret;
+  }
+
+  if (*bytes_read == -1) {
+    return Status::IOError(std::string("Error reading bytes from file: ") +
+                           std::string(strerror(errno)));
+  }
+
+  return Status::OK();
+}
+
+Status FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes,
+                  int64_t* bytes_read) {
+  *bytes_read = 0;
+
+  while (*bytes_read < nbytes) {
+    int64_t chunksize =
+        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - *bytes_read);
+    int64_t ret = pread_compat(fd, buffer, chunksize, position);
+
+    if (ret == -1) {
+      *bytes_read = ret;
+      break;
+    }
+    if (ret == 0) {
+      // EOF
+      break;
+    }
+    buffer += ret;
+    position += ret;
+    *bytes_read += ret;
+  }
+
+  if (*bytes_read == -1) {
+    return Status::IOError(std::string("Error reading bytes from file: ") +
+                           std::string(strerror(errno)));
+  }
+  return Status::OK();
+}
+
+//
+// Writing data
+//
+
+Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) {
+  int ret = 0;
+  int64_t bytes_written = 0;
+
+  while (ret != -1 && bytes_written < nbytes) {
+    int64_t chunksize =
+        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - bytes_written);
+#if defined(_MSC_VER)
+    ret = static_cast<int>(
+        _write(fd, buffer + bytes_written, static_cast<uint32_t>(chunksize)));
+#else
+    ret = static_cast<int>(
+        write(fd, buffer + bytes_written, static_cast<size_t>(chunksize)));
+#endif
+
+    if (ret != -1) {
+      bytes_written += ret;
+    }
+  }
+
+  if (ret == -1) {
+    return Status::IOError(std::string("Error writing bytes from file: ") +
+                           std::string(strerror(errno)));
+  }
+  return Status::OK();
+}
+
+Status FileTruncate(int fd, const int64_t size) {
+  int ret, errno_actual;
+
+#ifdef _MSC_VER
+  errno_actual = _chsize_s(fd, static_cast<size_t>(size));
+  ret = errno_actual == 0 ? 0 : -1;
+#else
+  ret = ftruncate(fd, static_cast<size_t>(size));
+  errno_actual = errno;
+#endif
+
+  if (ret == -1) {
+    return Status::IOError(std::string("Error truncating file: ") +
+                           std::string(strerror(errno_actual)));
+  }
+  return Status::OK();
+}
+
+}  // namespace internal
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index 2a01be1..e857490 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -20,11 +20,16 @@
 
 #include <iostream>
 #include <memory>
+#include <string>
 
 #include "arrow/buffer.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/status.h"
 
+#if defined(_MSC_VER)
+#include <boost/filesystem.hpp>  // NOLINT
+#endif
+
 namespace arrow {
 namespace io {
 
@@ -113,6 +118,52 @@ class StdinStream : public InputStream {
 };
 
 }  // namespace io
+
+namespace internal {
+
+#if defined(_MSC_VER)
+// namespace fs = boost::filesystem;
+// #define PlatformFilename fs::path
+typedef ::boost::filesystem::path PlatformFilename;
+
+#else
+
+struct PlatformFilename {
+  PlatformFilename() {}
+  explicit PlatformFilename(const std::string& path) { utf8_path = path; }
+
+  const char* c_str() const { return utf8_path.c_str(); }
+
+  const std::string& string() const { return utf8_path; }
+
+  size_t length() const { return utf8_path.size(); }
+
+  std::string utf8_path;
+};
+#endif
+
+Status FileNameFromString(const std::string& file_name, PlatformFilename* out);
+
+Status FileOpenReadable(const PlatformFilename& file_name, int* fd);
+Status FileOpenWriteable(const PlatformFilename& file_name, bool write_only,
+                         bool truncate, int* fd);
+
+Status FileRead(int fd, uint8_t* buffer, const int64_t nbytes, int64_t* bytes_read);
+Status FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes,
+                  int64_t* bytes_read);
+Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes);
+Status FileTruncate(int fd, const int64_t size);
+
+Status FileTell(int fd, int64_t* pos);
+Status FileSeek(int fd, int64_t pos);
+Status FileSeek(int fd, int64_t pos, int whence);
+Status FileGetSize(int fd, int64_t* size);
+
+Status FileClose(int fd);
+
+Status CreatePipe(int fd[2]);
+
+}  // namespace internal
 }  // namespace arrow
 
 #endif  // ARROW_UTIL_IO_UTIL_H

-- 
To stop receiving notification emails like this one, please contact
uwe@apache.org.