You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/30 17:12:30 UTC

[arrow] branch master updated: ARROW-5378: [C++] Local filesystem implementation

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 30b4736  ARROW-5378: [C++] Local filesystem implementation
30b4736 is described below

commit 30b47360732ba483dd7fc818363587fea7972396
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu May 30 12:12:22 2019 -0500

    ARROW-5378: [C++] Local filesystem implementation
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #4379 from pitrou/ARROW-5378-local-filesystem and squashes the following commits:
    
    c36e3dc8b <Antoine Pitrou> ARROW-5378:  Local filesystem implementation
---
 cpp/src/arrow/CMakeLists.txt                |   2 +
 cpp/src/arrow/filesystem/CMakeLists.txt     |   1 +
 cpp/src/arrow/filesystem/filesystem-test.cc | 305 +++++++++++++++++++++--
 cpp/src/arrow/filesystem/filesystem.cc      | 131 ++++++++++
 cpp/src/arrow/filesystem/filesystem.h       |  68 ++++-
 cpp/src/arrow/filesystem/localfs-test.cc    | 125 ++++++++++
 cpp/src/arrow/filesystem/localfs.cc         | 372 ++++++++++++++++++++++++++++
 cpp/src/arrow/filesystem/localfs.h          |  67 +++++
 cpp/src/arrow/filesystem/mockfs.cc          |  21 +-
 cpp/src/arrow/filesystem/path-util.cc       |  20 +-
 cpp/src/arrow/filesystem/path-util.h        |   7 +-
 cpp/src/arrow/filesystem/test-util.cc       | 143 ++++++-----
 cpp/src/arrow/filesystem/test-util.h        |  34 +++
 cpp/src/arrow/filesystem/util-internal.cc   |  45 ++++
 cpp/src/arrow/filesystem/util-internal.h    |  36 +++
 cpp/src/arrow/io/file-test.cc               |   8 +
 cpp/src/arrow/io/file.cc                    |  21 +-
 cpp/src/arrow/util/io-util-test.cc          |  82 +++++-
 cpp/src/arrow/util/io-util.cc               | 143 ++++++++---
 cpp/src/arrow/util/io-util.h                |  21 +-
 20 files changed, 1497 insertions(+), 155 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 606041a..c182ddb 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -106,8 +106,10 @@ set(ARROW_SRCS
     csv/parser.cc
     csv/reader.cc
     filesystem/filesystem.cc
+    filesystem/localfs.cc
     filesystem/mockfs.cc
     filesystem/path-util.cc
+    filesystem/util-internal.cc
     json/options.cc
     json/chunked-builder.cc
     json/chunker.cc
diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt
index da5e286..ea3d037 100644
--- a/cpp/src/arrow/filesystem/CMakeLists.txt
+++ b/cpp/src/arrow/filesystem/CMakeLists.txt
@@ -19,3 +19,4 @@
 arrow_install_all_headers("arrow/filesystem")
 
 add_arrow_test(filesystem-test)
+add_arrow_test(localfs-test)
diff --git a/cpp/src/arrow/filesystem/filesystem-test.cc b/cpp/src/arrow/filesystem/filesystem-test.cc
index 4f72e02..fa52f3c 100644
--- a/cpp/src/arrow/filesystem/filesystem-test.cc
+++ b/cpp/src/arrow/filesystem/filesystem-test.cc
@@ -118,6 +118,13 @@ TEST(PathUtil, ConcatAbstractPath) {
   ASSERT_EQ("abc", ConcatAbstractPath("", "abc"));
   ASSERT_EQ("abc/def", ConcatAbstractPath("abc", "def"));
   ASSERT_EQ("abc/def/ghi", ConcatAbstractPath("abc/def", "ghi"));
+
+  ASSERT_EQ("abc/def", ConcatAbstractPath("abc/", "def"));
+  ASSERT_EQ("abc/def/ghi", ConcatAbstractPath("abc/def/", "ghi"));
+
+  ASSERT_EQ("/abc", ConcatAbstractPath("/", "abc"));
+  ASSERT_EQ("/abc/def", ConcatAbstractPath("/abc", "def"));
+  ASSERT_EQ("/abc/def", ConcatAbstractPath("/abc/", "def"));
 }
 
 TEST(PathUtil, JoinAbstractPath) {
@@ -128,6 +135,15 @@ TEST(PathUtil, JoinAbstractPath) {
   ASSERT_EQ("", JoinAbstractPath(parts.begin(), parts.begin()));
 }
 
+TEST(PathUtil, EnsureTrailingSlash) {
+  ASSERT_EQ("", EnsureTrailingSlash(""));
+  ASSERT_EQ("/", EnsureTrailingSlash("/"));
+  ASSERT_EQ("abc/", EnsureTrailingSlash("abc"));
+  ASSERT_EQ("abc/", EnsureTrailingSlash("abc/"));
+  ASSERT_EQ("/abc/", EnsureTrailingSlash("/abc"));
+  ASSERT_EQ("/abc/", EnsureTrailingSlash("/abc/"));
+}
+
 ////////////////////////////////////////////////////////////////////////////
 // Generic MockFileSystem tests
 
@@ -179,10 +195,7 @@ class TestMockFS : public ::testing::Test {
   }
 
   void CreateFile(const std::string& path, const std::string& data) {
-    std::shared_ptr<io::OutputStream> stream;
-    ASSERT_OK(fs_->OpenAppendStream(path, &stream));
-    ASSERT_OK(WriteString(stream.get(), data));
-    ASSERT_OK(stream->Close());
+    ::arrow::fs::CreateFile(fs_.get(), path, data);
   }
 
  protected:
@@ -190,23 +203,6 @@ class TestMockFS : public ::testing::Test {
   std::shared_ptr<MockFileSystem> fs_;
 };
 
-void AssertFileStats(const FileStats& st, const std::string& path, FileType type) {
-  ASSERT_EQ(st.path(), path);
-  ASSERT_EQ(st.type(), type);
-}
-
-void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
-                     TimePoint mtime) {
-  AssertFileStats(st, path, type);
-  ASSERT_EQ(st.mtime(), mtime);
-}
-
-void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
-                     TimePoint mtime, int64_t size) {
-  AssertFileStats(st, path, type, mtime);
-  ASSERT_EQ(st.size(), size);
-}
-
 TEST_F(TestMockFS, Empty) {
   CheckDirs({});
   CheckFiles({});
@@ -317,6 +313,10 @@ TEST_F(TestMockFS, GetTargetStatsSelector) {
   AssertFileStats(stats[0], "AB", FileType::Directory, time_);
   AssertFileStats(stats[1], "AB/CD", FileType::Directory, time_);
   AssertFileStats(stats[2], "ab", FileType::File, time_, 4);
+
+  // Invalid path
+  s.base_dir = "//foo//bar//baz//";
+  ASSERT_RAISES(Invalid, fs_->GetTargetStats(s, &stats));
 }
 
 TEST_F(TestMockFS, OpenOutputStream) {
@@ -342,6 +342,269 @@ TEST_F(TestMockFS, OpenAppendStream) {
   CheckFiles({{"ab", time_, "some data"}});
 }
 
+////////////////////////////////////////////////////////////////////////////
+// Concrete SubTreeFileSystem tests
+
+class TestSubTreeFileSystem : public TestMockFS {
+ public:
+  void SetUp() override {
+    TestMockFS::SetUp();
+    ASSERT_OK(fs_->CreateDir("sub/tree"));
+    subfs_ = std::make_shared<SubTreeFileSystem>("sub/tree", fs_);
+  }
+
+  void CreateFile(const std::string& path, const std::string& data) {
+    ::arrow::fs::CreateFile(subfs_.get(), path, data);
+  }
+
+ protected:
+  std::shared_ptr<SubTreeFileSystem> subfs_;
+};
+
+TEST_F(TestSubTreeFileSystem, CreateDir) {
+  ASSERT_OK(subfs_->CreateDir("AB"));
+  ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));  // Recursive
+  // Non-recursive, parent doesn't exist
+  ASSERT_RAISES(IOError, subfs_->CreateDir("AB/GH/IJ", false /* recursive */));
+  ASSERT_OK(subfs_->CreateDir("AB/GH", false /* recursive */));
+  ASSERT_OK(subfs_->CreateDir("AB/GH/IJ", false /* recursive */));
+  // Can't create root dir
+  ASSERT_RAISES(IOError, subfs_->CreateDir(""));
+  CheckDirs({{"sub", time_},
+             {"sub/tree", time_},
+             {"sub/tree/AB", time_},
+             {"sub/tree/AB/CD", time_},
+             {"sub/tree/AB/CD/EF", time_},
+             {"sub/tree/AB/GH", time_},
+             {"sub/tree/AB/GH/IJ", time_}});
+  CheckFiles({});
+}
+
+TEST_F(TestSubTreeFileSystem, DeleteDir) {
+  ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));
+  ASSERT_OK(subfs_->CreateDir("AB/GH/IJ"));
+  ASSERT_OK(subfs_->DeleteDir("AB/CD"));
+  ASSERT_OK(subfs_->DeleteDir("AB/GH/IJ"));
+  CheckDirs({{"sub", time_},
+             {"sub/tree", time_},
+             {"sub/tree/AB", time_},
+             {"sub/tree/AB/GH", time_}});
+  CheckFiles({});
+  ASSERT_RAISES(IOError, subfs_->DeleteDir("AB/CD"));
+  ASSERT_OK(subfs_->DeleteDir("AB"));
+  CheckDirs({{"sub", time_}, {"sub/tree", time_}});
+  CheckFiles({});
+
+  // Can't delete root dir
+  ASSERT_RAISES(IOError, subfs_->DeleteDir(""));
+  CheckDirs({{"sub", time_}, {"sub/tree", time_}});
+  CheckFiles({});
+}
+
+TEST_F(TestSubTreeFileSystem, DeleteFile) {
+  ASSERT_OK(subfs_->CreateDir("AB"));
+
+  CreateFile("ab", "");
+  CheckFiles({{"sub/tree/ab", time_, ""}});
+  ASSERT_OK(subfs_->DeleteFile("ab"));
+  CheckFiles({});
+
+  CreateFile("AB/cd", "");
+  CheckFiles({{"sub/tree/AB/cd", time_, ""}});
+  ASSERT_OK(subfs_->DeleteFile("AB/cd"));
+  CheckFiles({});
+
+  ASSERT_RAISES(IOError, subfs_->DeleteFile("non-existent"));
+  ASSERT_RAISES(IOError, subfs_->DeleteFile(""));
+}
+
+TEST_F(TestSubTreeFileSystem, MoveFile) {
+  CreateFile("ab", "");
+  CheckFiles({{"sub/tree/ab", time_, ""}});
+  ASSERT_OK(subfs_->Move("ab", "cd"));
+  CheckFiles({{"sub/tree/cd", time_, ""}});
+
+  ASSERT_OK(subfs_->CreateDir("AB"));
+  ASSERT_OK(subfs_->Move("cd", "AB/ef"));
+  CheckFiles({{"sub/tree/AB/ef", time_, ""}});
+
+  ASSERT_RAISES(IOError, subfs_->Move("AB/ef", ""));
+  ASSERT_RAISES(IOError, subfs_->Move("", "xxx"));
+  CheckFiles({{"sub/tree/AB/ef", time_, ""}});
+  CheckDirs({{"sub", time_}, {"sub/tree", time_}, {"sub/tree/AB", time_}});
+}
+
+TEST_F(TestSubTreeFileSystem, MoveDir) {
+  ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));
+  ASSERT_OK(subfs_->Move("AB/CD", "GH"));
+  CheckDirs({{"sub", time_},
+             {"sub/tree", time_},
+             {"sub/tree/AB", time_},
+             {"sub/tree/GH", time_},
+             {"sub/tree/GH/EF", time_}});
+
+  ASSERT_RAISES(IOError, subfs_->Move("AB", ""));
+}
+
+TEST_F(TestSubTreeFileSystem, CopyFile) {
+  CreateFile("ab", "data");
+  CheckFiles({{"sub/tree/ab", time_, "data"}});
+  ASSERT_OK(subfs_->CopyFile("ab", "cd"));
+  CheckFiles({{"sub/tree/ab", time_, "data"}, {"sub/tree/cd", time_, "data"}});
+
+  ASSERT_OK(subfs_->CreateDir("AB"));
+  ASSERT_OK(subfs_->CopyFile("cd", "AB/ef"));
+  CheckFiles({{"sub/tree/AB/ef", time_, "data"},
+              {"sub/tree/ab", time_, "data"},
+              {"sub/tree/cd", time_, "data"}});
+
+  ASSERT_RAISES(IOError, subfs_->CopyFile("ab", ""));
+  ASSERT_RAISES(IOError, subfs_->CopyFile("", "xxx"));
+  CheckFiles({{"sub/tree/AB/ef", time_, "data"},
+              {"sub/tree/ab", time_, "data"},
+              {"sub/tree/cd", time_, "data"}});
+}
+
+TEST_F(TestSubTreeFileSystem, OpenInputStream) {
+  std::shared_ptr<io::InputStream> stream;
+  std::shared_ptr<Buffer> buffer;
+  CreateFile("ab", "data");
+
+  ASSERT_OK(subfs_->OpenInputStream("ab", &stream));
+  ASSERT_OK(stream->Read(4, &buffer));
+  AssertBufferEqual(*buffer, "data");
+  ASSERT_OK(stream->Close());
+
+  ASSERT_RAISES(IOError, subfs_->OpenInputStream("non-existent", &stream));
+  ASSERT_RAISES(IOError, subfs_->OpenInputStream("", &stream));
+}
+
+TEST_F(TestSubTreeFileSystem, OpenInputFile) {
+  std::shared_ptr<io::RandomAccessFile> stream;
+  std::shared_ptr<Buffer> buffer;
+  CreateFile("ab", "some data");
+
+  ASSERT_OK(subfs_->OpenInputFile("ab", &stream));
+  ASSERT_OK(stream->ReadAt(5, 4, &buffer));
+  AssertBufferEqual(*buffer, "data");
+  ASSERT_OK(stream->Close());
+
+  ASSERT_RAISES(IOError, subfs_->OpenInputFile("non-existent", &stream));
+  ASSERT_RAISES(IOError, subfs_->OpenInputFile("", &stream));
+}
+
+TEST_F(TestSubTreeFileSystem, OpenOutputStream) {
+  std::shared_ptr<io::OutputStream> stream;
+
+  ASSERT_OK(subfs_->OpenOutputStream("ab", &stream));
+  ASSERT_OK(stream->Write("data"));
+  ASSERT_OK(stream->Close());
+  CheckFiles({{"sub/tree/ab", time_, "data"}});
+
+  ASSERT_OK(subfs_->CreateDir("AB"));
+  ASSERT_OK(subfs_->OpenOutputStream("AB/cd", &stream));
+  ASSERT_OK(stream->Write("other"));
+  ASSERT_OK(stream->Close());
+  CheckFiles({{"sub/tree/AB/cd", time_, "other"}, {"sub/tree/ab", time_, "data"}});
+
+  ASSERT_RAISES(IOError, subfs_->OpenOutputStream("non-existent/xxx", &stream));
+  ASSERT_RAISES(IOError, subfs_->OpenOutputStream("AB", &stream));
+  ASSERT_RAISES(IOError, subfs_->OpenOutputStream("", &stream));
+  CheckFiles({{"sub/tree/AB/cd", time_, "other"}, {"sub/tree/ab", time_, "data"}});
+}
+
+TEST_F(TestSubTreeFileSystem, OpenAppendStream) {
+  std::shared_ptr<io::OutputStream> stream;
+
+  ASSERT_OK(subfs_->OpenAppendStream("ab", &stream));
+  ASSERT_OK(stream->Write("some"));
+  ASSERT_OK(stream->Close());
+  CheckFiles({{"sub/tree/ab", time_, "some"}});
+
+  ASSERT_OK(subfs_->OpenAppendStream("ab", &stream));
+  ASSERT_OK(stream->Write(" data"));
+  ASSERT_OK(stream->Close());
+  CheckFiles({{"sub/tree/ab", time_, "some data"}});
+}
+
+TEST_F(TestSubTreeFileSystem, GetTargetStatsSingle) {
+  FileStats st;
+  ASSERT_OK(subfs_->CreateDir("AB/CD"));
+
+  ASSERT_OK(subfs_->GetTargetStats("AB", &st));
+  AssertFileStats(st, "AB", FileType::Directory, time_);
+  ASSERT_OK(subfs_->GetTargetStats("AB/CD", &st));
+  AssertFileStats(st, "AB/CD", FileType::Directory, time_);
+
+  CreateFile("ab", "data");
+  ASSERT_OK(subfs_->GetTargetStats("ab", &st));
+  AssertFileStats(st, "ab", FileType::File, time_, 4);
+
+  ASSERT_OK(subfs_->GetTargetStats("non-existent", &st));
+  AssertFileStats(st, "non-existent", FileType::NonExistent);
+}
+
+TEST_F(TestSubTreeFileSystem, GetTargetStatsVector) {
+  std::vector<FileStats> stats;
+
+  ASSERT_OK(subfs_->CreateDir("AB/CD"));
+  CreateFile("ab", "data");
+  CreateFile("AB/cd", "other data");
+
+  ASSERT_OK(subfs_->GetTargetStats({"ab", "AB", "AB/cd", "non-existent"}, &stats));
+  ASSERT_EQ(stats.size(), 4);
+  AssertFileStats(stats[0], "ab", FileType::File, time_, 4);
+  AssertFileStats(stats[1], "AB", FileType::Directory, time_);
+  AssertFileStats(stats[2], "AB/cd", FileType::File, time_, 10);
+  AssertFileStats(stats[3], "non-existent", FileType::NonExistent);
+}
+
+TEST_F(TestSubTreeFileSystem, GetTargetStatsSelector) {
+  std::vector<FileStats> stats;
+  Selector selector;
+
+  ASSERT_OK(subfs_->CreateDir("AB/CD"));
+  CreateFile("ab", "data");
+  CreateFile("AB/cd", "data2");
+  CreateFile("AB/CD/ef", "data34");
+
+  selector.base_dir = "AB";
+  selector.recursive = false;
+  ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+  ASSERT_EQ(stats.size(), 2);
+  AssertFileStats(stats[0], "AB/CD", FileType::Directory, time_);
+  AssertFileStats(stats[1], "AB/cd", FileType::File, time_, 5);
+
+  selector.recursive = true;
+  ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+  ASSERT_EQ(stats.size(), 3);
+  AssertFileStats(stats[0], "AB/CD", FileType::Directory, time_);
+  AssertFileStats(stats[1], "AB/CD/ef", FileType::File, time_, 6);
+  AssertFileStats(stats[2], "AB/cd", FileType::File, time_, 5);
+
+  selector.base_dir = "";
+  selector.recursive = false;
+  ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+  ASSERT_EQ(stats.size(), 2);
+  AssertFileStats(stats[0], "AB", FileType::Directory, time_);
+  AssertFileStats(stats[1], "ab", FileType::File, time_, 4);
+
+  selector.recursive = true;
+  ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+  ASSERT_EQ(stats.size(), 5);
+  AssertFileStats(stats[0], "AB", FileType::Directory, time_);
+  AssertFileStats(stats[1], "AB/CD", FileType::Directory, time_);
+  AssertFileStats(stats[2], "AB/CD/ef", FileType::File, time_, 6);
+  AssertFileStats(stats[3], "AB/cd", FileType::File, time_, 5);
+  AssertFileStats(stats[4], "ab", FileType::File, time_, 4);
+
+  selector.base_dir = "non-existent";
+  ASSERT_RAISES(IOError, subfs_->GetTargetStats(selector, &stats));
+  selector.allow_non_existent = true;
+  ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+  ASSERT_EQ(stats.size(), 0);
+}
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc
index 0ca9dec..1e35ccd 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -24,6 +24,11 @@
 namespace arrow {
 namespace fs {
 
+using internal::ConcatAbstractPath;
+using internal::EnsureTrailingSlash;
+using internal::GetAbstractPathParent;
+using internal::kSep;
+
 std::string ToString(FileType ftype) {
   switch (ftype) {
     case FileType::NonExistent:
@@ -70,5 +75,131 @@ Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) {
   return st;
 }
 
+//////////////////////////////////////////////////////////////////////////
+// SubTreeFileSystem implementation
+
+// FIXME EnsureTrailingSlash works on abstract paths... but we will be
+// passing a concrete path, e.g. "C:" on Windows.
+
+SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path,
+                                     std::shared_ptr<FileSystem> base_fs)
+    : base_path_(EnsureTrailingSlash(base_path)), base_fs_(base_fs) {}
+
+SubTreeFileSystem::~SubTreeFileSystem() {}
+
+std::string SubTreeFileSystem::PrependBase(const std::string& s) const {
+  if (s.empty()) {
+    return base_path_;
+  } else {
+    return ConcatAbstractPath(base_path_, s);
+  }
+}
+
+Status SubTreeFileSystem::PrependBaseNonEmpty(std::string* s) const {
+  if (s->empty()) {
+    return Status::IOError("Empty path");
+  } else {
+    *s = ConcatAbstractPath(base_path_, *s);
+    return Status::OK();
+  }
+}
+
+Status SubTreeFileSystem::StripBase(const std::string& s, std::string* out) const {
+  auto len = base_path_.length();
+  // Note base_path_ ends with a slash (if not empty)
+  if (s.length() >= len && s.substr(0, len) == base_path_) {
+    *out = s.substr(len);
+    return Status::OK();
+  } else {
+    return Status::UnknownError("Underlying filesystem returned path '", s,
+                                "', which is not a subpath of '", base_path_, "'");
+  }
+}
+
+Status SubTreeFileSystem::FixStats(FileStats* st) const {
+  std::string fixed_path;
+  RETURN_NOT_OK(StripBase(st->path(), &fixed_path));
+  st->set_path(fixed_path);
+  return Status::OK();
+}
+
+Status SubTreeFileSystem::GetTargetStats(const std::string& path, FileStats* out) {
+  RETURN_NOT_OK(base_fs_->GetTargetStats(PrependBase(path), out));
+  return FixStats(out);
+}
+
+Status SubTreeFileSystem::GetTargetStats(const Selector& select,
+                                         std::vector<FileStats>* out) {
+  auto selector = select;
+  selector.base_dir = PrependBase(selector.base_dir);
+  RETURN_NOT_OK(base_fs_->GetTargetStats(selector, out));
+  for (auto& st : *out) {
+    RETURN_NOT_OK(FixStats(&st));
+  }
+  return Status::OK();
+}
+
+Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->CreateDir(s, recursive);
+}
+
+Status SubTreeFileSystem::DeleteDir(const std::string& path) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->DeleteDir(s);
+}
+
+Status SubTreeFileSystem::DeleteFile(const std::string& path) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->DeleteFile(s);
+}
+
+Status SubTreeFileSystem::Move(const std::string& src, const std::string& dest) {
+  auto s = src;
+  auto d = dest;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  RETURN_NOT_OK(PrependBaseNonEmpty(&d));
+  return base_fs_->Move(s, d);
+}
+
+Status SubTreeFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+  auto s = src;
+  auto d = dest;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  RETURN_NOT_OK(PrependBaseNonEmpty(&d));
+  return base_fs_->CopyFile(s, d);
+}
+
+Status SubTreeFileSystem::OpenInputStream(const std::string& path,
+                                          std::shared_ptr<io::InputStream>* out) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->OpenInputStream(s, out);
+}
+
+Status SubTreeFileSystem::OpenInputFile(const std::string& path,
+                                        std::shared_ptr<io::RandomAccessFile>* out) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->OpenInputFile(s, out);
+}
+
+Status SubTreeFileSystem::OpenOutputStream(const std::string& path,
+                                           std::shared_ptr<io::OutputStream>* out) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->OpenOutputStream(s, out);
+}
+
+Status SubTreeFileSystem::OpenAppendStream(const std::string& path,
+                                           std::shared_ptr<io::OutputStream>* out) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->OpenAppendStream(s, out);
+}
+
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index e9783e7..9a3e5a0 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -27,6 +27,17 @@
 #include "arrow/util/compression.h"
 #include "arrow/util/visibility.h"
 
+// The Windows API defines macros from *File resolving to either
+// *FileA or *FileW.  Need to undo them.
+#ifdef _WIN32
+#ifdef DeleteFile
+#undef DeleteFile
+#endif
+#ifdef CopyFile
+#undef CopyFile
+#endif
+#endif
+
 namespace arrow {
 
 namespace io {
@@ -134,6 +145,8 @@ class ARROW_EXPORT FileSystem {
   virtual Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) = 0;
 
   /// Create a directory and subdirectories.
+  ///
+  /// This function succeeds if the directory already exists.
   virtual Status CreateDir(const std::string& path, bool recursive = true) = 0;
 
   /// Delete a directory and its contents, recursively.
@@ -148,8 +161,10 @@ class ARROW_EXPORT FileSystem {
 
   /// Move / rename a file or directory.
   ///
-  /// If the destination exists and is a directory, an error is returned.
-  /// Otherwise, it is replaced.
+  /// If the destination exists:
+  /// - if it is a non-empty directory, an error is returned
+  /// - otherwise, if it has the same type as the source, it is replaced
+  /// - otherwise, behavior is unspecified (implementation-dependent).
   virtual Status Move(const std::string& src, const std::string& dest) = 0;
 
   /// Copy a file.
@@ -179,5 +194,54 @@ class ARROW_EXPORT FileSystem {
                                   std::shared_ptr<io::OutputStream>* out) = 0;
 };
 
+/// \brief EXPERIMENTAL: a FileSystem implementation that delegates to another
+/// implementation after prepending a fixed base path.
+///
+/// This is useful to expose a logical view of a subtree of a filesystem,
+/// for example a directory in a LocalFileSystem.
+/// This makes no security guarantee.  For example, symlinks may allow to
+/// "escape" the subtree and access other parts of the underlying filesystem.
+class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
+ public:
+  explicit SubTreeFileSystem(const std::string& base_path,
+                             std::shared_ptr<FileSystem> base_fs);
+  ~SubTreeFileSystem() override;
+
+  using FileSystem::GetTargetStats;
+  Status GetTargetStats(const std::string& path, FileStats* out) override;
+  Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) override;
+
+  Status CreateDir(const std::string& path, bool recursive = true) override;
+
+  Status DeleteDir(const std::string& path) override;
+
+  Status DeleteFile(const std::string& path) override;
+
+  Status Move(const std::string& src, const std::string& dest) override;
+
+  Status CopyFile(const std::string& src, const std::string& dest) override;
+
+  Status OpenInputStream(const std::string& path,
+                         std::shared_ptr<io::InputStream>* out) override;
+
+  Status OpenInputFile(const std::string& path,
+                       std::shared_ptr<io::RandomAccessFile>* out) override;
+
+  Status OpenOutputStream(const std::string& path,
+                          std::shared_ptr<io::OutputStream>* out) override;
+
+  Status OpenAppendStream(const std::string& path,
+                          std::shared_ptr<io::OutputStream>* out) override;
+
+ protected:
+  const std::string base_path_;
+  std::shared_ptr<FileSystem> base_fs_;
+
+  std::string PrependBase(const std::string& s) const;
+  Status PrependBaseNonEmpty(std::string* s) const;
+  Status StripBase(const std::string& s, std::string* out) const;
+  Status FixStats(FileStats* st) const;
+};
+
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs-test.cc b/cpp/src/arrow/filesystem/localfs-test.cc
new file mode 100644
index 0000000..ef36ea4
--- /dev/null
+++ b/cpp/src/arrow/filesystem/localfs-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/test-util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+using ::arrow::internal::TemporaryDir;
+
+TimePoint CurrentTimePoint() {
+  auto now = std::chrono::system_clock::now();
+  return TimePoint(
+      std::chrono::duration_cast<TimePoint::duration>(now.time_since_epoch()));
+}
+
+////////////////////////////////////////////////////////////////////////////
+// Generic LocalFileSystem tests
+
+class TestLocalFSGeneric : public ::testing::Test, public GenericFileSystemTest {
+ public:
+  void SetUp() override {
+    ASSERT_OK(TemporaryDir::Make("test-localfs-", &temp_dir_));
+    local_fs_ = std::make_shared<LocalFileSystem>();
+    fs_ = std::make_shared<SubTreeFileSystem>(temp_dir_->path().ToString(), local_fs_);
+  }
+
+ protected:
+  std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; }
+
+  std::unique_ptr<TemporaryDir> temp_dir_;
+  std::shared_ptr<LocalFileSystem> local_fs_;
+  std::shared_ptr<FileSystem> fs_;
+};
+
+GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGeneric);
+
+////////////////////////////////////////////////////////////////////////////
+// Concrete LocalFileSystem tests
+
+class TestLocalFS : public ::testing::Test {
+ public:
+  void SetUp() {
+    ASSERT_OK(TemporaryDir::Make("test-localfs-", &temp_dir_));
+    local_fs_ = std::make_shared<LocalFileSystem>();
+    fs_ = std::make_shared<SubTreeFileSystem>(temp_dir_->path().ToString(), local_fs_);
+  }
+
+ protected:
+  std::unique_ptr<TemporaryDir> temp_dir_;
+  std::shared_ptr<LocalFileSystem> local_fs_;
+  std::shared_ptr<FileSystem> fs_;
+};
+
+TEST_F(TestLocalFS, DirectoryMTime) {
+  TimePoint t1 = CurrentTimePoint();
+  ASSERT_OK(fs_->CreateDir("AB/CD/EF"));
+  TimePoint t2 = CurrentTimePoint();
+
+  std::vector<FileStats> stats;
+  ASSERT_OK(fs_->GetTargetStats({"AB", "AB/CD/EF"}, &stats));
+  ASSERT_EQ(stats.size(), 2);
+  AssertFileStats(stats[0], "AB", FileType::Directory);
+  AssertFileStats(stats[1], "AB/CD/EF", FileType::Directory);
+
+  // NOTE: creating AB/CD updates AB's modification time, but creating
+  // AB/CD/EF doesn't.  So AB/CD/EF's modification time should always be
+  // the same as or after AB's modification time.
+  AssertDurationBetween(stats[1].mtime() - stats[0].mtime(), 0, kTimeSlack);
+  // Depending on filesystem time granularity, the recorded time could be
+  // before the system time when doing the modification.
+  AssertDurationBetween(stats[0].mtime() - t1, -kTimeSlack, kTimeSlack);
+  AssertDurationBetween(t2 - stats[1].mtime(), -kTimeSlack, kTimeSlack);
+}
+
+TEST_F(TestLocalFS, FileMTime) {
+  TimePoint t1 = CurrentTimePoint();
+  ASSERT_OK(fs_->CreateDir("AB/CD"));
+  CreateFile(fs_.get(), "AB/CD/ab", "data");
+  TimePoint t2 = CurrentTimePoint();
+
+  std::vector<FileStats> stats;
+  ASSERT_OK(fs_->GetTargetStats({"AB", "AB/CD/ab"}, &stats));
+  ASSERT_EQ(stats.size(), 2);
+  AssertFileStats(stats[0], "AB", FileType::Directory);
+  AssertFileStats(stats[1], "AB/CD/ab", FileType::File, 4);
+
+  AssertDurationBetween(stats[1].mtime() - stats[0].mtime(), 0, kTimeSlack);
+  AssertDurationBetween(stats[0].mtime() - t1, -kTimeSlack, kTimeSlack);
+  AssertDurationBetween(t2 - stats[1].mtime(), -kTimeSlack, kTimeSlack);
+}
+
+// TODO check UNC paths on Windows, e.g. "\\server\share\path\file" (networked)
+// or "\\?\C:\path\file" (extended-length paths)
+
+}  // namespace internal
+}  // namespace fs
+}  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc
new file mode 100644
index 0000000..4ba07e7
--- /dev/null
+++ b/cpp/src/arrow/filesystem/localfs.cc
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstring>
+#include <utility>
+
+#ifdef _WIN32
+#include "arrow/util/windows_compatibility.h"
+#else
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#endif
+
+#include <boost/filesystem.hpp>
+
+#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/util-internal.h"
+#include "arrow/io/file.h"
+#include "arrow/util/io-util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace fs {
+
+namespace bfs = ::boost::filesystem;
+
+using ::arrow::internal::NativePathString;
+using ::arrow::internal::PlatformFilename;
+
+namespace {
+
+#define BOOST_FILESYSTEM_TRY try {
+#define BOOST_FILESYSTEM_CATCH           \
+  }                                      \
+  catch (bfs::filesystem_error & _err) { \
+    return ToStatus(_err);               \
+  }
+
+// NOTE: catching filesystem_error gives more context than system::error_code
+// (it includes the file path(s) in the error message)
+
+Status ToStatus(const bfs::filesystem_error& err) { return Status::IOError(err.what()); }
+
+template <typename... Args>
+Status ErrnoToStatus(Args&&... args) {
+  auto err_string = ::arrow::internal::ErrnoMessage(errno);
+  return Status::IOError(std::forward<Args>(args)..., err_string);
+}
+
+#ifdef _WIN32
+
+std::string NativeToString(const NativePathString& ns) {
+  PlatformFilename fn(ns);
+  return fn.ToString();
+}
+
+template <typename... Args>
+Status WinErrorToStatus(Args&&... args) {
+  auto err_string = ::arrow::internal::WinErrorMessage(GetLastError());
+  return Status::IOError(std::forward<Args>(args)..., err_string);
+}
+
+TimePoint ToTimePoint(FILETIME ft) {
+  // Hundreds of nanoseconds between January 1, 1601 (UTC) and the Unix epoch.
+  static constexpr int64_t kFileTimeEpoch = 11644473600LL * 10000000;
+
+  int64_t hundreds = (static_cast<int64_t>(ft.dwHighDateTime) << 32) + ft.dwLowDateTime -
+                     kFileTimeEpoch;  // hundreds of ns since Unix epoch
+  std::chrono::nanoseconds ns_count(100 * hundreds);
+  return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
+}
+
+FileStats FileInformationToFileStat(const BY_HANDLE_FILE_INFORMATION& info) {
+  FileStats st;
+  if (info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
+    st.set_type(FileType::Directory);
+    st.set_size(kNoSize);
+  } else {
+    // Regular file
+    st.set_type(FileType::File);
+    st.set_size((static_cast<int64_t>(info.nFileSizeHigh) << 32) + info.nFileSizeLow);
+  }
+  st.set_mtime(ToTimePoint(info.ftLastWriteTime));
+  return st;
+}
+
+Status StatFile(const std::wstring& path, FileStats* out) {
+  HANDLE h;
+  std::string bytes_path = NativeToString(path);
+
+  /* Inspired by CPython, see Modules/posixmodule.c */
+  h = CreateFileW(path.c_str(), FILE_READ_ATTRIBUTES, /* desired access */
+                  0,                                  /* share mode */
+                  NULL,                               /* security attributes */
+                  OPEN_EXISTING,
+                  /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */
+                  FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS, NULL);
+
+  if (h == INVALID_HANDLE_VALUE) {
+    DWORD err = GetLastError();
+    if (err == ERROR_FILE_NOT_FOUND || err == ERROR_PATH_NOT_FOUND) {
+      out->set_path(bytes_path);
+      out->set_type(FileType::NonExistent);
+      out->set_mtime(kNoTime);
+      out->set_size(kNoSize);
+      return Status::OK();
+    } else {
+      return WinErrorToStatus("Failed querying information for path '", bytes_path, "'");
+    }
+  }
+  BY_HANDLE_FILE_INFORMATION info;
+  if (!GetFileInformationByHandle(h, &info)) {
+    Status st =
+        WinErrorToStatus("Failed querying information for path '", bytes_path, "'");
+    CloseHandle(h);
+    return st;
+  }
+  CloseHandle(h);
+  *out = FileInformationToFileStat(info);
+  out->set_path(bytes_path);
+  return Status::OK();
+}
+
+#else  // POSIX systems
+
+TimePoint ToTimePoint(const struct timespec& s) {
+  std::chrono::nanoseconds ns_count(static_cast<int64_t>(s.tv_sec) * 1000000000 +
+                                    static_cast<int64_t>(s.tv_nsec));
+  return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
+}
+
+FileStats StatToFileStat(const struct stat& s) {
+  FileStats st;
+  if (S_ISREG(s.st_mode)) {
+    st.set_type(FileType::File);
+    st.set_size(static_cast<int64_t>(s.st_size));
+  } else if (S_ISDIR(s.st_mode)) {
+    st.set_type(FileType::Directory);
+    st.set_size(kNoSize);
+  } else {
+    st.set_type(FileType::Unknown);
+    st.set_size(kNoSize);
+  }
+#ifdef __APPLE__
+  // macOS doesn't use the POSIX-compliant spelling
+  st.set_mtime(ToTimePoint(s.st_mtimespec));
+#else
+  st.set_mtime(ToTimePoint(s.st_mtim));
+#endif
+  return st;
+}
+
+Status StatFile(const std::string& path, FileStats* out) {
+  struct stat s;
+  int r = stat(path.c_str(), &s);
+  if (r == -1) {
+    if (errno == ENOENT || errno == ENOTDIR || errno == ELOOP) {
+      out->set_type(FileType::NonExistent);
+      out->set_mtime(kNoTime);
+      out->set_size(kNoSize);
+    } else {
+      return ErrnoToStatus("Failed stat()ing path '", path, "'");
+    }
+  } else {
+    *out = StatToFileStat(s);
+  }
+  out->set_path(path);
+  return Status::OK();
+}
+
+#endif
+
+Status StatSelector(const NativePathString& path, const Selector& select,
+                    std::vector<FileStats>* out) {
+  bfs::path p(path);
+
+  if (select.allow_non_existent) {
+    bfs::file_status st;
+    BOOST_FILESYSTEM_TRY
+    st = bfs::status(p);
+    BOOST_FILESYSTEM_CATCH
+    if (st.type() == bfs::file_not_found) {
+      return Status::OK();
+    }
+  }
+
+  BOOST_FILESYSTEM_TRY
+  for (const auto& entry : bfs::directory_iterator(p)) {
+    FileStats st;
+    NativePathString ns = entry.path().native();
+    RETURN_NOT_OK(StatFile(ns, &st));
+    if (st.type() != FileType::NonExistent) {
+      out->push_back(std::move(st));
+    }
+    if (select.recursive && st.type() == FileType::Directory) {
+      RETURN_NOT_OK(StatSelector(ns, select, out));
+    }
+  }
+  BOOST_FILESYSTEM_CATCH
+
+  return Status::OK();
+}
+
+}  // namespace
+
+LocalFileSystem::LocalFileSystem() {}
+
+LocalFileSystem::~LocalFileSystem() {}
+
+Status LocalFileSystem::GetTargetStats(const std::string& path, FileStats* out) {
+  PlatformFilename fn;
+  RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+  return StatFile(fn.ToNative(), out);
+}
+
+Status LocalFileSystem::GetTargetStats(const Selector& select,
+                                       std::vector<FileStats>* out) {
+  PlatformFilename fn;
+  RETURN_NOT_OK(PlatformFilename::FromString(select.base_dir, &fn));
+  out->clear();
+  return StatSelector(fn.ToNative(), select, out);
+}
+
+Status LocalFileSystem::CreateDir(const std::string& path, bool recursive) {
+  PlatformFilename fn;
+  RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+  if (recursive) {
+    return ::arrow::internal::CreateDirTree(fn);
+  } else {
+    return ::arrow::internal::CreateDir(fn);
+  }
+}
+
+Status LocalFileSystem::DeleteDir(const std::string& path) {
+  bool deleted = false;
+  PlatformFilename fn;
+  RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+  RETURN_NOT_OK(::arrow::internal::DeleteDirTree(fn, &deleted));
+  if (deleted) {
+    return Status::OK();
+  } else {
+    return Status::IOError("Directory does not exist: '", path, "'");
+  }
+}
+
+Status LocalFileSystem::DeleteFile(const std::string& path) {
+  bool deleted = false;
+  PlatformFilename fn;
+  RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+  RETURN_NOT_OK(::arrow::internal::DeleteFile(fn, &deleted));
+  if (deleted) {
+    return Status::OK();
+  } else {
+    return Status::IOError("File does not exist: '", path, "'");
+  }
+}
+
+Status LocalFileSystem::Move(const std::string& src, const std::string& dest) {
+  PlatformFilename sfn, dfn;
+  RETURN_NOT_OK(PlatformFilename::FromString(src, &sfn));
+  RETURN_NOT_OK(PlatformFilename::FromString(dest, &dfn));
+
+#ifdef _WIN32
+  if (!MoveFileExW(sfn.ToNative().c_str(), dfn.ToNative().c_str(),
+                   MOVEFILE_REPLACE_EXISTING)) {
+    return WinErrorToStatus("Failed renaming '", sfn.ToString(), "' to '", dfn.ToString(),
+                            "': ");
+  }
+#else
+  if (rename(sfn.ToNative().c_str(), dfn.ToNative().c_str()) == -1) {
+    return ErrnoToStatus("Failed renaming '", sfn.ToString(), "' to '", dfn.ToString(),
+                         "': ");
+  }
+#endif
+  return Status::OK();
+}
+
+Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+  PlatformFilename sfn, dfn;
+  RETURN_NOT_OK(PlatformFilename::FromString(src, &sfn));
+  RETURN_NOT_OK(PlatformFilename::FromString(dest, &dfn));
+  // XXX should we use fstat() to compare inodes?
+  if (sfn.ToNative() == dfn.ToNative()) {
+    return Status::OK();
+  }
+
+#ifdef _WIN32
+  if (!CopyFileW(sfn.ToNative().c_str(), dfn.ToNative().c_str(),
+                 FALSE /* bFailIfExists */)) {
+    return WinErrorToStatus("Failed copying '", sfn.ToString(), "' to '", dfn.ToString(),
+                            "': ");
+  }
+  return Status::OK();
+#else
+  std::shared_ptr<io::InputStream> is;
+  std::shared_ptr<io::OutputStream> os;
+  RETURN_NOT_OK(OpenInputStream(src, &is));
+  RETURN_NOT_OK(OpenOutputStream(dest, &os));
+  RETURN_NOT_OK(internal::CopyStream(is, os, 1024 * 1024 /* chunk_size */));
+  RETURN_NOT_OK(os->Close());
+  return is->Close();
+#endif
+}
+
+Status LocalFileSystem::OpenInputStream(const std::string& path,
+                                        std::shared_ptr<io::InputStream>* out) {
+  std::shared_ptr<io::ReadableFile> file;
+  RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
+  *out = std::move(file);
+  return Status::OK();
+}
+
+Status LocalFileSystem::OpenInputFile(const std::string& path,
+                                      std::shared_ptr<io::RandomAccessFile>* out) {
+  std::shared_ptr<io::ReadableFile> file;
+  RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
+  *out = std::move(file);
+  return Status::OK();
+}
+
+namespace {
+
+Status OpenOutputStreamGeneric(const std::string& path, bool truncate, bool append,
+                               std::shared_ptr<io::OutputStream>* out) {
+  PlatformFilename fn;
+  int fd;
+  bool write_only = true;
+  RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+  RETURN_NOT_OK(
+      ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append, &fd));
+  Status st = io::FileOutputStream::Open(fd, out);
+  if (!st.ok()) {
+    ARROW_UNUSED(::arrow::internal::FileClose(fd));
+  }
+  return st;
+}
+
+}  // namespace
+
+Status LocalFileSystem::OpenOutputStream(const std::string& path,
+                                         std::shared_ptr<io::OutputStream>* out) {
+  bool truncate = true;
+  bool append = false;
+  return OpenOutputStreamGeneric(path, truncate, append, out);
+}
+
+Status LocalFileSystem::OpenAppendStream(const std::string& path,
+                                         std::shared_ptr<io::OutputStream>* out) {
+  bool truncate = false;
+  bool append = true;
+  return OpenOutputStreamGeneric(path, truncate, append, out);
+}
+
+}  // namespace fs
+}  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h
new file mode 100644
index 0000000..c720ac2
--- /dev/null
+++ b/cpp/src/arrow/filesystem/localfs.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/filesystem/filesystem.h"
+
+namespace arrow {
+namespace fs {
+
+/// \brief EXPERIMENTAL: a FileSystem implementation accessing files
+/// on the local machine.
+///
+/// Details such as symlinks are abstracted away (symlinks are always followed,
+/// except when deleting an entry).
+class ARROW_EXPORT LocalFileSystem : public FileSystem {
+ public:
+  LocalFileSystem();
+  ~LocalFileSystem() override;
+
+  using FileSystem::GetTargetStats;
+  Status GetTargetStats(const std::string& path, FileStats* out) override;
+  Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) override;
+
+  Status CreateDir(const std::string& path, bool recursive = true) override;
+
+  Status DeleteDir(const std::string& path) override;
+
+  Status DeleteFile(const std::string& path) override;
+
+  Status Move(const std::string& src, const std::string& dest) override;
+
+  Status CopyFile(const std::string& src, const std::string& dest) override;
+
+  Status OpenInputStream(const std::string& path,
+                         std::shared_ptr<io::InputStream>* out) override;
+
+  Status OpenInputFile(const std::string& path,
+                       std::shared_ptr<io::RandomAccessFile>* out) override;
+
+  Status OpenOutputStream(const std::string& path,
+                          std::shared_ptr<io::OutputStream>* out) override;
+
+  Status OpenAppendStream(const std::string& path,
+                          std::shared_ptr<io::OutputStream>* out) override;
+};
+
+}  // namespace fs
+}  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc
index 8b4ef2a..a5f936f 100644
--- a/cpp/src/arrow/filesystem/mockfs.cc
+++ b/cpp/src/arrow/filesystem/mockfs.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <iterator>
 #include <map>
 #include <string>
@@ -529,9 +530,23 @@ Status MockFileSystem::Move(const std::string& src, const std::string& dest) {
     if (op.src_entry == nullptr) {
       return PathNotFound(src);
     }
-    if (op.dest_entry != nullptr && op.dest_entry->is_dir()) {
-      return Status::IOError("Cannot replace destination '", dest,
-                             "', which is a directory");
+    if (op.dest_entry != nullptr) {
+      if (op.dest_entry->is_dir()) {
+        return Status::IOError("Cannot replace destination '", dest,
+                               "', which is a directory");
+      }
+      if (op.dest_entry->is_file() && op.src_entry->is_dir()) {
+        return Status::IOError("Cannot replace destination '", dest,
+                               "', which is a file, with directory '", src, "'");
+      }
+    }
+    if (op.src_parts.size() < op.dest_parts.size()) {
+      // Check if dest is a child of src
+      auto p =
+          std::mismatch(op.src_parts.begin(), op.src_parts.end(), op.dest_parts.begin());
+      if (p.first == op.src_parts.end()) {
+        return Status::IOError("Cannot move '", src, "' into child path '", dest, "'");
+      }
     }
 
     // Move original entry, fix its name
diff --git a/cpp/src/arrow/filesystem/path-util.cc b/cpp/src/arrow/filesystem/path-util.cc
index bbab3ab..c30e6b2 100644
--- a/cpp/src/arrow/filesystem/path-util.cc
+++ b/cpp/src/arrow/filesystem/path-util.cc
@@ -23,7 +23,7 @@ namespace arrow {
 namespace fs {
 namespace internal {
 
-static constexpr char kSep = '/';
+// XXX How does this encode Windows UNC paths?
 
 std::vector<std::string> SplitAbstractPath(const std::string& path) {
   std::vector<std::string> parts;
@@ -81,7 +81,23 @@ Status ValidateAbstractPathParts(const std::vector<std::string>& parts) {
 
 std::string ConcatAbstractPath(const std::string& base, const std::string& stem) {
   DCHECK(!stem.empty());
-  return base.empty() ? stem : base + kSep + stem;
+  if (base.empty()) {
+    return stem;
+  } else if (base.back() == kSep) {
+    return base + stem;
+  } else {
+    return base + kSep + stem;
+  }
+}
+
+std::string EnsureTrailingSlash(const std::string& s) {
+  if (s.length() > 0 && s.back() != kSep) {
+    // XXX How about "C:" on Windows?  We probably don't want to turn it into "C:/"...
+    // Unless the local filesystem always uses absolute paths
+    return s + kSep;
+  } else {
+    return s;
+  }
 }
 
 }  // namespace internal
diff --git a/cpp/src/arrow/filesystem/path-util.h b/cpp/src/arrow/filesystem/path-util.h
index 85b81c2..444451d 100644
--- a/cpp/src/arrow/filesystem/path-util.h
+++ b/cpp/src/arrow/filesystem/path-util.h
@@ -27,6 +27,8 @@ namespace arrow {
 namespace fs {
 namespace internal {
 
+constexpr char kSep = '/';
+
 // Computations on abstract paths (not local paths with system-dependent behaviour).
 // Abstract paths are typically used in URIs.
 
@@ -47,13 +49,16 @@ Status ValidateAbstractPathParts(const std::vector<std::string>& parts);
 ARROW_EXPORT
 std::string ConcatAbstractPath(const std::string& base, const std::string& stem);
 
+ARROW_EXPORT
+std::string EnsureTrailingSlash(const std::string& s);
+
 // Join the components of an abstract path.
 template <class StringIt>
 std::string JoinAbstractPath(StringIt it, StringIt end) {
   std::string path;
   for (; it != end; ++it) {
     if (!path.empty()) {
-      path += '/';
+      path += kSep;
     }
     path += *it;
   }
diff --git a/cpp/src/arrow/filesystem/test-util.cc b/cpp/src/arrow/filesystem/test-util.cc
index d87fbc5..27fc69b 100644
--- a/cpp/src/arrow/filesystem/test-util.cc
+++ b/cpp/src/arrow/filesystem/test-util.cc
@@ -32,8 +32,6 @@ namespace fs {
 
 namespace {
 
-static constexpr double kTimeSlack = 2.0;  // In seconds
-
 std::vector<FileStats> GetAllWithType(FileSystem* fs, FileType type) {
   Selector selector;
   selector.base_dir = "";
@@ -57,20 +55,12 @@ std::vector<FileStats> GetAllFiles(FileSystem* fs) {
   return GetAllWithType(fs, FileType::File);
 }
 
-// Sort of vector of FileStats by lexicographic path order
-void SortStats(std::vector<FileStats>* stats) {
-  std::sort(stats->begin(), stats->end(),
-            [](const FileStats& left, const FileStats& right) -> bool {
-              return left.path() < right.path();
-            });
-}
-
 void AssertPaths(const std::vector<FileStats>& stats,
                  const std::vector<std::string>& expected_paths) {
   auto sorted_stats = stats;
   SortStats(&sorted_stats);
-  std::vector<std::string> paths(stats.size());
-  std::transform(stats.begin(), stats.end(), paths.begin(),
+  std::vector<std::string> paths(sorted_stats.size());
+  std::transform(sorted_stats.begin(), sorted_stats.end(), paths.begin(),
                  [&](const FileStats& st) { return st.path(); });
 
   ASSERT_EQ(paths, expected_paths);
@@ -88,6 +78,30 @@ Status WriteString(io::OutputStream* stream, const std::string& s) {
   return stream->Write(s.data(), static_cast<int64_t>(s.length()));
 }
 
+void AssertFileContents(FileSystem* fs, const std::string& path,
+                        const std::string& expected_data) {
+  FileStats st;
+  ASSERT_OK(fs->GetTargetStats(path, &st));
+  ASSERT_EQ(st.type(), FileType::File) << "For path '" << path << "'";
+  ASSERT_EQ(st.size(), static_cast<int64_t>(expected_data.length()))
+      << "For path '" << path << "'";
+
+  std::shared_ptr<io::InputStream> stream;
+  std::shared_ptr<Buffer> buffer, leftover;
+  ASSERT_OK(fs->OpenInputStream(path, &stream));
+  ASSERT_OK(stream->Read(st.size(), &buffer));
+  AssertBufferEqual(*buffer, expected_data);
+  // No data left in stream
+  ASSERT_OK(stream->Read(1, &leftover));
+  ASSERT_EQ(leftover->size(), 0);
+
+  ASSERT_OK(stream->Close());
+}
+
+void ValidateTimePoint(TimePoint tp) { ASSERT_GE(tp.time_since_epoch().count(), 0); }
+
+};  // namespace
+
 void CreateFile(FileSystem* fs, const std::string& path, const std::string& data) {
   std::shared_ptr<io::OutputStream> stream;
   ASSERT_OK(fs->OpenOutputStream(path, &stream));
@@ -95,12 +109,31 @@ void CreateFile(FileSystem* fs, const std::string& path, const std::string& data
   ASSERT_OK(stream->Close());
 }
 
+void SortStats(std::vector<FileStats>* stats) {
+  std::sort(stats->begin(), stats->end(),
+            [](const FileStats& left, const FileStats& right) -> bool {
+              return left.path() < right.path();
+            });
+}
+
 void AssertFileStats(const FileStats& st, const std::string& path, FileType type) {
   ASSERT_EQ(st.path(), path);
   ASSERT_EQ(st.type(), type);
 }
 
 void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+                     TimePoint mtime) {
+  AssertFileStats(st, path, type);
+  ASSERT_EQ(st.mtime(), mtime);
+}
+
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+                     TimePoint mtime, int64_t size) {
+  AssertFileStats(st, path, type, mtime);
+  ASSERT_EQ(st.size(), size);
+}
+
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
                      int64_t size) {
   AssertFileStats(st, path, type);
   ASSERT_EQ(st.size(), size);
@@ -113,36 +146,6 @@ void AssertFileStats(FileSystem* fs, const std::string& path, Args&&... args) {
   AssertFileStats(st, path, std::forward<Args>(args)...);
 }
 
-void AssertFileContents(FileSystem* fs, const std::string& path,
-                        const std::string& expected_data) {
-  FileStats st;
-  ASSERT_OK(fs->GetTargetStats(path, &st));
-  ASSERT_EQ(st.type(), FileType::File);
-  ASSERT_EQ(st.size(), static_cast<int64_t>(expected_data.length()));
-
-  std::shared_ptr<io::InputStream> stream;
-  std::shared_ptr<Buffer> buffer, leftover;
-  ASSERT_OK(fs->OpenInputStream(path, &stream));
-  ASSERT_OK(stream->Read(st.size(), &buffer));
-  AssertBufferEqual(*buffer, expected_data);
-  // No data left in stream
-  ASSERT_OK(stream->Read(1, &leftover));
-  ASSERT_EQ(leftover->size(), 0);
-
-  ASSERT_OK(stream->Close());
-}
-
-void ValidateTimePoint(TimePoint tp) { ASSERT_GE(tp.time_since_epoch().count(), 0); }
-
-template <typename Duration>
-void AssertDurationBetween(Duration d, double min_secs, double max_secs) {
-  auto seconds = std::chrono::duration_cast<std::chrono::duration<double>>(d);
-  ASSERT_GE(seconds.count(), min_secs);
-  ASSERT_LE(seconds.count(), max_secs);
-}
-
-};  // namespace
-
 ////////////////////////////////////////////////////////////////////////////
 // GenericFileSystemTest implementation
 
@@ -351,31 +354,31 @@ void GenericFileSystemTest::TestMoveDir(FileSystem* fs) {
   AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
   AssertAllFiles(fs, {"EF/ghi", "KL/CD/def", "KL/abc"});
 
-  // Destination is a file => clobber
-  CreateFile(fs, "MN", "");
-  ASSERT_OK(fs->Move("KL", "MN"));
-  AssertAllDirs(fs, {"EF", "MN", "MN/CD"});
-  AssertAllFiles(fs, {"EF/ghi", "MN/CD/def", "MN/abc"});
+  // Overwrite file with directory => untested (implementation-dependent)
 
   // Identical source and destination: allowed to succeed or raise IOError,
   // but should not lose data.
-  Status st = fs->Move("MN", "MN");
+  Status st = fs->Move("KL", "KL");
   if (!st.ok()) {
     ASSERT_RAISES(IOError, st);
   }
-  AssertAllDirs(fs, {"EF", "MN", "MN/CD"});
-  AssertAllFiles(fs, {"EF/ghi", "MN/CD/def", "MN/abc"});
+  AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
+  AssertAllFiles(fs, {"EF/ghi", "KL/CD/def", "KL/abc"});
 
-  // Destination is a directory
-  ASSERT_RAISES(IOError, fs->Move("MN", "EF"));
-  AssertAllDirs(fs, {"EF", "MN", "MN/CD"});
-  AssertAllFiles(fs, {"EF/ghi", "MN/CD/def", "MN/abc"});
+  // Destination is a non-empty directory
+  ASSERT_RAISES(IOError, fs->Move("KL", "EF"));
+  AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
+  AssertAllFiles(fs, {"EF/ghi", "KL/CD/def", "KL/abc"});
+
+  // Cannot move directory inside itself
+  ASSERT_RAISES(IOError, fs->Move("KL", "KL/ZZ"));
 
   // (other errors tested in TestMoveFile)
 
-  // File contents didn't change
-  AssertFileContents(fs, "MN/abc", "abc data");
-  AssertFileContents(fs, "MN/CD/def", "def data");
+  // Contents didn't change
+  AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
+  AssertFileContents(fs, "KL/abc", "abc data");
+  AssertFileContents(fs, "KL/CD/def", "def data");
 }
 
 void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
@@ -417,7 +420,7 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
   AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
   AssertFileContents(fs, "def", "other data");
 
-  // Destination is a directory
+  // Destination is a non-empty directory
   ASSERT_RAISES(IOError, fs->CopyFile("def", "AB"));
   // Source doesn't exist
   ASSERT_RAISES(IOError, fs->CopyFile("abc", "xxx"));
@@ -430,7 +433,7 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
 }
 
 void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) {
-  ASSERT_OK(fs->CreateDir("AB/CD"));
+  ASSERT_OK(fs->CreateDir("AB/CD/EF"));
   CreateFile(fs, "AB/CD/ghi", "some data");
 
   FileStats st;
@@ -442,11 +445,12 @@ void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) {
   first_time = st.mtime();
   ValidateTimePoint(first_time);
 
-  ASSERT_OK(fs->GetTargetStats("AB/CD", &st));
-  AssertFileStats(st, "AB/CD", FileType::Directory);
-  ASSERT_EQ(st.base_name(), "CD");
+  ASSERT_OK(fs->GetTargetStats("AB/CD/EF", &st));
+  AssertFileStats(st, "AB/CD/EF", FileType::Directory);
+  ASSERT_EQ(st.base_name(), "EF");
   ASSERT_EQ(st.size(), kNoSize);
-  // AB/CD was created a little after AB
+  // AB/CD's creation can impact AB's modification time, however, AB/CD/EF's
+  // creation doesn't, so AB/CD/EF's mtime should be after AB's.
   AssertDurationBetween(st.mtime() - first_time, 0.0, kTimeSlack);
 
   ASSERT_OK(fs->GetTargetStats("AB/CD/ghi", &st));
@@ -459,10 +463,6 @@ void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) {
   ASSERT_EQ(st.base_name(), "zz");
   ASSERT_EQ(st.size(), kNoSize);
   ASSERT_EQ(st.mtime(), kNoTime);
-
-  // Invalid path
-  // XXX will this really be rejected by all filesystems?
-  ASSERT_RAISES(Invalid, fs->GetTargetStats("//foo//bar//baz//", &st));
 }
 
 void GenericFileSystemTest::TestGetTargetStatsVector(FileSystem* fs) {
@@ -491,11 +491,6 @@ void GenericFileSystemTest::TestGetTargetStatsVector(FileSystem* fs) {
   ASSERT_OK(fs->GetTargetStats("AB", &st));
   AssertFileStats(st, "AB", FileType::Directory);
   ASSERT_EQ(st.mtime(), first_time);
-
-  // Invalid path
-  // XXX will this really be rejected by all filesystems?
-  ASSERT_RAISES(Invalid,
-                fs->GetTargetStats({"AB", "AB/CD", "//foo//bar//baz//"}, &stats));
 }
 
 void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) {
@@ -603,6 +598,8 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {
   AssertAllFiles(fs, {"CD/ghi", "abc"});
   AssertFileContents(fs, "CD/ghi", "overwritten");
 
+  ASSERT_RAISES(Invalid, WriteString(stream.get(), "x"));  // Stream is closed
+
   // Cannot turn dir into file
   ASSERT_RAISES(IOError, fs->OpenOutputStream("CD", &stream));
   AssertAllDirs(fs, {"CD"});
@@ -632,6 +629,8 @@ void GenericFileSystemTest::TestOpenAppendStream(FileSystem* fs) {
   AssertAllDirs(fs, {});
   AssertAllFiles(fs, {"abc"});
   AssertFileContents(fs, "abc", "some data appended");
+
+  ASSERT_RAISES(Invalid, WriteString(stream.get(), "x"));  // Stream is closed
 }
 
 void GenericFileSystemTest::TestOpenInputStream(FileSystem* fs) {
diff --git a/cpp/src/arrow/filesystem/test-util.h b/cpp/src/arrow/filesystem/test-util.h
index 8230501..179b08c 100644
--- a/cpp/src/arrow/filesystem/test-util.h
+++ b/cpp/src/arrow/filesystem/test-util.h
@@ -17,13 +17,47 @@
 
 #pragma once
 
+#include <chrono>
 #include <memory>
+#include <string>
+#include <vector>
 
 #include "arrow/filesystem/filesystem.h"
 
 namespace arrow {
 namespace fs {
 
+static constexpr double kTimeSlack = 2.0;  // In seconds
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type);
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+                     TimePoint mtime);
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+                     TimePoint mtime, int64_t size);
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+                     int64_t size);
+
+ARROW_EXPORT
+void CreateFile(FileSystem* fs, const std::string& path, const std::string& data);
+
+// Sort of vector of FileStats by lexicographic path order
+ARROW_EXPORT
+void SortStats(std::vector<FileStats>* stats);
+
+template <typename Duration>
+void AssertDurationBetween(Duration d, double min_secs, double max_secs) {
+  auto seconds = std::chrono::duration_cast<std::chrono::duration<double>>(d);
+  ASSERT_GE(seconds.count(), min_secs);
+  ASSERT_LE(seconds.count(), max_secs);
+}
+
 // Generic tests for FileSystem implementations.
 // To use this class, subclass both from it and ::testing::Test,
 // implement GetEmptyFileSystem(), and use GENERIC_FS_TEST_FUNCTIONS()
diff --git a/cpp/src/arrow/filesystem/util-internal.cc b/cpp/src/arrow/filesystem/util-internal.cc
new file mode 100644
index 0000000..d097f91
--- /dev/null
+++ b/cpp/src/arrow/filesystem/util-internal.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/filesystem/util-internal.h"
+#include "arrow/buffer.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+Status CopyStream(const std::shared_ptr<io::InputStream>& src,
+                  const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size) {
+  std::shared_ptr<Buffer> chunk;
+  int64_t bytes_read;
+
+  RETURN_NOT_OK(AllocateBuffer(chunk_size, &chunk));
+  while (true) {
+    RETURN_NOT_OK(src->Read(chunk_size, &bytes_read, chunk->mutable_data()));
+    if (bytes_read == 0) {
+      // EOF
+      break;
+    }
+    RETURN_NOT_OK(dest->Write(chunk->data(), bytes_read));
+  }
+
+  return Status::OK();
+}
+
+}  // namespace internal
+}  // namespace fs
+}  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/util-internal.h b/cpp/src/arrow/filesystem/util-internal.h
new file mode 100644
index 0000000..eabdad4
--- /dev/null
+++ b/cpp/src/arrow/filesystem/util-internal.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+ARROW_EXPORT
+Status CopyStream(const std::shared_ptr<io::InputStream>& src,
+                  const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size);
+
+}  // namespace internal
+}  // namespace fs
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/file-test.cc b/cpp/src/arrow/io/file-test.cc
index c548785..1f0336e 100644
--- a/cpp/src/arrow/io/file-test.cc
+++ b/cpp/src/arrow/io/file-test.cc
@@ -145,6 +145,7 @@ TEST_F(TestFileOutputStream, Close) {
   ASSERT_OK(file_->Close());
   ASSERT_TRUE(file_->closed());
   ASSERT_TRUE(FileIsClosed(fd));
+  ASSERT_RAISES(Invalid, file_->Write(data, strlen(data)));
 
   // Idempotent
   ASSERT_OK(file_->Close());
@@ -158,6 +159,7 @@ TEST_F(TestFileOutputStream, Close) {
   ASSERT_OK(stream_->Close());
   ASSERT_TRUE(stream_->closed());
   ASSERT_TRUE(FileIsClosed(fd));
+  ASSERT_RAISES(Invalid, stream_->Write(data, strlen(data)));
 
   // Idempotent
   ASSERT_OK(stream_->Close());
@@ -409,6 +411,9 @@ TEST_F(TestReadableFile, Read) {
   ASSERT_OK(file_->Seek(1));
   ASSERT_OK(file_->Read(size, &buf));
   ASSERT_EQ(size - 1, buf->size());
+
+  ASSERT_OK(file_->Close());
+  ASSERT_RAISES(Invalid, file_->Read(1, &buf));
 }
 
 TEST_F(TestReadableFile, ReadAt) {
@@ -436,6 +441,9 @@ TEST_F(TestReadableFile, ReadAt) {
 
   Buffer expected(reinterpret_cast<const uint8_t*>(test_data + 2), 5);
   ASSERT_TRUE(buffer2->Equals(expected));
+
+  ASSERT_OK(file_->Close());
+  ASSERT_RAISES(Invalid, file_->ReadAt(0, 1, &buffer2));
 }
 
 TEST_F(TestReadableFile, NonExistentFile) {
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 7c347f1..e678ca2 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -113,35 +113,52 @@ class OSFile {
     return Status::OK();
   }
 
+  Status CheckClosed() const {
+    if (!is_open_) {
+      return Status::Invalid("Invalid operation on closed file");
+    }
+    return Status::OK();
+  }
+
   Status Close() {
     if (is_open_) {
       // Even if closing fails, the fd will likely be closed (perhaps it's
       // already closed).
       is_open_ = false;
-      RETURN_NOT_OK(internal::FileClose(fd_));
+      int fd = fd_;
+      fd_ = -1;
+      RETURN_NOT_OK(internal::FileClose(fd));
     }
     return Status::OK();
   }
 
   Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+    RETURN_NOT_OK(CheckClosed());
     return internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
   }
 
   Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    RETURN_NOT_OK(CheckClosed());
     return internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position, nbytes,
                                 bytes_read);
   }
 
   Status Seek(int64_t pos) {
+    RETURN_NOT_OK(CheckClosed());
     if (pos < 0) {
       return Status::Invalid("Invalid position");
     }
     return internal::FileSeek(fd_, pos);
   }
 
-  Status Tell(int64_t* pos) const { return internal::FileTell(fd_, pos); }
+  Status Tell(int64_t* pos) const {
+    RETURN_NOT_OK(CheckClosed());
+    return internal::FileTell(fd_, pos);
+  }
 
   Status Write(const void* data, int64_t length) {
+    RETURN_NOT_OK(CheckClosed());
+
     std::lock_guard<std::mutex> guard(lock_);
     if (length < 0) {
       return Status::IOError("Length must be non-negative");
diff --git a/cpp/src/arrow/util/io-util-test.cc b/cpp/src/arrow/util/io-util-test.cc
index 0f4f6ca..68d3f30 100644
--- a/cpp/src/arrow/util/io-util-test.cc
+++ b/cpp/src/arrow/util/io-util-test.cc
@@ -35,6 +35,48 @@ void AssertNotExists(const PlatformFilename& path) {
   ASSERT_FALSE(exists) << "Path '" << path.ToString() << "' exists";
 }
 
+TEST(PlatformFilename, RoundtripAscii) {
+  PlatformFilename fn;
+  ASSERT_OK(PlatformFilename::FromString("a/b", &fn));
+  ASSERT_EQ(fn.ToString(), "a/b");
+#if _WIN32
+  ASSERT_EQ(fn.ToNative(), L"a\\b");
+#else
+  ASSERT_EQ(fn.ToNative(), "a/b");
+#endif
+}
+
+TEST(PlatformFilename, RoundtripUtf8) {
+  PlatformFilename fn;
+  ASSERT_OK(PlatformFilename::FromString("h\xc3\xa9h\xc3\xa9", &fn));
+  ASSERT_EQ(fn.ToString(), "h\xc3\xa9h\xc3\xa9");
+#if _WIN32
+  ASSERT_EQ(fn.ToNative(), L"h\u00e9h\u00e9");
+#else
+  ASSERT_EQ(fn.ToNative(), "h\xc3\xa9h\xc3\xa9");
+#endif
+}
+
+#if _WIN32
+TEST(PlatformFilename, Separators) {
+  PlatformFilename fn;
+  ASSERT_OK(PlatformFilename::FromString("C:/foo/bar", &fn));
+  ASSERT_EQ(fn.ToString(), "C:/foo/bar");
+  ASSERT_EQ(fn.ToNative(), L"C:\\foo\\bar");
+
+  ASSERT_OK(PlatformFilename::FromString("C:\\foo\\bar", &fn));
+  ASSERT_EQ(fn.ToString(), "C:/foo/bar");
+  ASSERT_EQ(fn.ToNative(), L"C:\\foo\\bar");
+}
+#endif
+
+TEST(PlatformFilename, Invalid) {
+  PlatformFilename fn;
+  std::string s = "foo";
+  s += '\x00';
+  ASSERT_RAISES(Invalid, PlatformFilename::FromString(s, &fn));
+}
+
 TEST(CreateDirDeleteDir, Basics) {
   const std::string BASE = "xxx-io-util-test-dir";
   bool created, deleted;
@@ -74,15 +116,15 @@ TEST(CreateDirDeleteDir, Basics) {
 }
 
 TEST(TemporaryDir, Basics) {
-  std::unique_ptr<TemporaryDir> dir;
+  std::unique_ptr<TemporaryDir> temp_dir;
   PlatformFilename fn;
 
-  ASSERT_OK(TemporaryDir::Make("some-prefix-", &dir));
-  fn = dir->path();
+  ASSERT_OK(TemporaryDir::Make("some-prefix-", &temp_dir));
+  fn = temp_dir->path();
   // Path has a trailing separator, for convenience
   ASSERT_EQ(fn.ToString().back(), '/');
 #if defined(_WIN32)
-  ASSERT_EQ(fn.ToNative().back(), L'/');
+  ASSERT_EQ(fn.ToNative().back(), L'\\');
 #else
   ASSERT_EQ(fn.ToNative().back(), '/');
 #endif
@@ -98,19 +140,41 @@ TEST(TemporaryDir, Basics) {
   ASSERT_OK(CreateDir(child));
   AssertExists(child);
 
-  dir.reset();
+  temp_dir.reset();
   AssertNotExists(fn);
   AssertNotExists(child);
 }
 
+TEST(CreateDirTree, Basics) {
+  std::unique_ptr<TemporaryDir> temp_dir;
+  PlatformFilename fn;
+  bool created;
+
+  ASSERT_OK(TemporaryDir::Make("io-util-test-", &temp_dir));
+
+  ASSERT_OK(temp_dir->path().Join("AB/CD", &fn));
+  ASSERT_OK(CreateDirTree(fn, &created));
+  ASSERT_TRUE(created);
+  ASSERT_OK(CreateDirTree(fn, &created));
+  ASSERT_FALSE(created);
+
+  ASSERT_OK(temp_dir->path().Join("AB", &fn));
+  ASSERT_OK(CreateDirTree(fn, &created));
+  ASSERT_FALSE(created);
+
+  ASSERT_OK(temp_dir->path().Join("EF", &fn));
+  ASSERT_OK(CreateDirTree(fn, &created));
+  ASSERT_TRUE(created);
+}
+
 TEST(DeleteFile, Basics) {
-  std::unique_ptr<TemporaryDir> dir;
+  std::unique_ptr<TemporaryDir> temp_dir;
   PlatformFilename fn;
   int fd;
   bool deleted;
 
-  ASSERT_OK(TemporaryDir::Make("io-util-test-", &dir));
-  ASSERT_OK(dir->path().Join("test-file", &fn));
+  ASSERT_OK(TemporaryDir::Make("io-util-test-", &temp_dir));
+  ASSERT_OK(temp_dir->path().Join("test-file", &fn));
 
   AssertNotExists(fn);
   ASSERT_OK(FileOpenWritable(fn, true /* write_only */, true /* truncate */,
@@ -125,7 +189,7 @@ TEST(DeleteFile, Basics) {
   AssertNotExists(fn);
 
   // Cannot call DeleteFile on directory
-  ASSERT_OK(dir->path().Join("test-dir", &fn));
+  ASSERT_OK(temp_dir->path().Join("test-temp_dir", &fn));
   ASSERT_OK(CreateDir(fn));
   AssertExists(fn);
   ASSERT_RAISES(IOError, DeleteFile(fn));
diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
index cb1237c..8ff4361 100644
--- a/cpp/src/arrow/util/io-util.cc
+++ b/cpp/src/arrow/util/io-util.cc
@@ -30,6 +30,7 @@
 #include <cstring>
 #include <iostream>
 #include <random>
+#include <sstream>
 #include <string>
 #include <utility>
 
@@ -73,11 +74,6 @@
 #include <unistd.h>
 #endif
 
-// POSIX systems do not have this
-#ifndef O_BINARY
-#define O_BINARY 0
-#endif
-
 // define max read/write count
 #if defined(_WIN32)
 #define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
@@ -185,6 +181,27 @@ namespace internal {
 
 namespace bfs = ::boost::filesystem;
 
+namespace {
+
+#if _WIN32
+using NativePathCodeCvt = std::codecvt_utf8_utf16<wchar_t>;
+#endif
+
+Status StringToNative(const std::string& s, NativePathString* out) {
+#if _WIN32
+  try {
+    *out = std::wstring_convert<NativePathCodeCvt>{}.from_bytes(s);
+  } catch (std::range_error& e) {
+    return Status::Invalid(e.what());
+  }
+#else
+  *out = s;
+#endif
+  return Status::OK();
+}
+
+}  // namespace
+
 #define BOOST_FILESYSTEM_TRY try {
 #define BOOST_FILESYSTEM_CATCH           \
   }                                      \
@@ -212,11 +229,31 @@ static std::string MakeRandomName(int num_chars) {
   return s;
 }
 
+std::string ErrnoMessage(int errnum) { return std::strerror(errnum); }
+
+#if _WIN32
+std::string WinErrorMessage(int errnum) {
+  char buf[1024];
+  auto nchars = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
+                               NULL, errnum, 0, buf, sizeof(buf), NULL);
+  if (nchars == 0) {
+    // Fallback
+    std::stringstream ss;
+    ss << "Windows error #" << errnum;
+    return ss.str();
+  }
+  return std::string(buf, nchars);
+}
+#endif
+
 //
 // PlatformFilename implementation
 //
 
 struct PlatformFilename::Impl {
+  Impl() = default;
+  explicit Impl(bfs::path p) : path(p.make_preferred()) {}
+
   bfs::path path;
 };
 
@@ -244,33 +281,25 @@ PlatformFilename& PlatformFilename::operator=(PlatformFilename&& other) {
   return *this;
 }
 
-PlatformFilename::PlatformFilename(const PlatformFilename::NativePathString& path)
+PlatformFilename::PlatformFilename(const NativePathString& path)
     : PlatformFilename(Impl{path}) {}
 
-const PlatformFilename::NativePathString& PlatformFilename::ToNative() const {
+const NativePathString& PlatformFilename::ToNative() const {
   return impl_->path.native();
 }
 
-std::string PlatformFilename::ToString() const { return impl_->path.string(); }
-
-namespace {
-
-Status StringToNative(const std::string& s, PlatformFilename::NativePathString* out) {
-#if defined(_WIN32)
-  try {
-    *out = std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(s);
-  } catch (std::range_error& e) {
-    return Status::Invalid(e.what());
-  }
+std::string PlatformFilename::ToString() const {
+#if _WIN32
+  return impl_->path.generic_string(NativePathCodeCvt());
 #else
-  *out = s;
+  return impl_->path.generic_string();
 #endif
-  return Status::OK();
 }
 
-}  // namespace
-
 Status PlatformFilename::FromString(const std::string& file_name, PlatformFilename* out) {
+  if (file_name.find_first_of('\0') != std::string::npos) {
+    return Status::Invalid("Embedded NUL char in file name: '", file_name, "'");
+  }
   NativePathString ns;
   RETURN_NOT_OK(StringToNative(file_name, &ns));
   *out = PlatformFilename(std::move(ns));
@@ -297,9 +326,26 @@ Status CreateDir(const PlatformFilename& dir_path, bool* created) {
   return Status::OK();
 }
 
+Status CreateDirTree(const PlatformFilename& dir_path, bool* created) {
+  bool res;
+  BOOST_FILESYSTEM_TRY
+  res = bfs::create_directories(dir_path.impl_->path);
+  BOOST_FILESYSTEM_CATCH
+  if (created) {
+    *created = res;
+  }
+  return Status::OK();
+}
+
 Status DeleteDirTree(const PlatformFilename& dir_path, bool* deleted) {
   BOOST_FILESYSTEM_TRY
-  auto n_removed = bfs::remove_all(dir_path.impl_->path);
+  const auto& path = dir_path.impl_->path;
+  // XXX There is a race here.
+  auto st = bfs::symlink_status(path);
+  if (st.type() != bfs::file_not_found && st.type() != bfs::directory_file) {
+    return Status::IOError("Cannot delete non -directory '", path.string(), "'");
+  }
+  auto n_removed = bfs::remove_all(path);
   if (deleted) {
     *deleted = n_removed != 0;
   }
@@ -317,7 +363,7 @@ Status DeleteFile(const PlatformFilename& file_path, bool* deleted) {
   if (!bfs::is_directory(st)) {
     res = bfs::remove(path);
   } else {
-    return Status::IOError("Cannot delete directory '", path.string());
+    return Status::IOError("Cannot delete directory '", path.string(), "'");
   }
   if (deleted) {
     *deleted = res;
@@ -360,8 +406,8 @@ static inline Status CheckFileOpResult(int ret, int errno_actual,
                                        const PlatformFilename& file_name,
                                        const char* opname) {
   if (ret == -1) {
-    return Status::IOError("Failed to ", opname, " file: ", file_name.ToString(),
-                           " , error: ", std::strerror(errno_actual));
+    return Status::IOError("Failed to ", opname, " file '", file_name.ToString(),
+                           "', error: ", ErrnoMessage(errno_actual));
   }
   return Status::OK();
 }
@@ -373,8 +419,22 @@ Status FileOpenReadable(const PlatformFilename& file_name, int* fd) {
                            _O_RDONLY | _O_BINARY | _O_NOINHERIT, _SH_DENYNO, _S_IREAD);
   ret = *fd;
 #else
-  ret = *fd = open(file_name.ToNative().c_str(), O_RDONLY | O_BINARY);
+  ret = *fd = open(file_name.ToNative().c_str(), O_RDONLY);
   errno_actual = errno;
+
+  if (ret >= 0) {
+    // open(O_RDONLY) succeeds on directories, check for it
+    struct stat st;
+    ret = fstat(*fd, &st);
+    if (ret == -1) {
+      ARROW_UNUSED(FileClose(*fd));
+      // Will propagate error below
+    } else if (S_ISDIR(st.st_mode)) {
+      ARROW_UNUSED(FileClose(*fd));
+      return Status::IOError("Cannot open for reading: path '", file_name.ToString(),
+                             "' is a directory");
+    }
+  }
 #endif
 
   return CheckFileOpResult(ret, errno_actual, file_name, "open local");
@@ -405,7 +465,7 @@ Status FileOpenWritable(const PlatformFilename& file_name, bool write_only, bool
   ret = *fd;
 
 #else
-  int oflag = O_CREAT | O_BINARY;
+  int oflag = O_CREAT;
 
   if (truncate) {
     oflag |= O_TRUNC;
@@ -423,7 +483,16 @@ Status FileOpenWritable(const PlatformFilename& file_name, bool write_only, bool
   ret = *fd = open(file_name.ToNative().c_str(), oflag, ARROW_WRITE_SHMODE);
   errno_actual = errno;
 #endif
-  return CheckFileOpResult(ret, errno_actual, file_name, "open local");
+  RETURN_NOT_OK(CheckFileOpResult(ret, errno_actual, file_name, "open local"));
+  if (append) {
+    // Seek to end, as O_APPEND does not necessarily do it
+    auto ret = lseek64_compat(*fd, 0, SEEK_END);
+    if (ret == -1) {
+      ARROW_UNUSED(FileClose(*fd));
+      return Status::IOError("lseek failed");
+    }
+  }
+  return Status::OK();
 }
 
 Status FileTell(int fd, int64_t* pos) {
@@ -452,7 +521,7 @@ Status CreatePipe(int fd[2]) {
 #endif
 
   if (ret == -1) {
-    return Status::IOError("Error creating pipe: ", std::strerror(errno));
+    return Status::IOError("Error creating pipe: ", ErrnoMessage(errno));
   }
   return Status::OK();
 }
@@ -461,7 +530,7 @@ static Status StatusFromErrno(const char* prefix) {
 #ifdef _WIN32
   errno = __map_mman_error(GetLastError(), EPERM);
 #endif
-  return Status::IOError(prefix, std::strerror(errno));
+  return Status::IOError(prefix, ErrnoMessage(errno));
 }
 
 //
@@ -643,8 +712,7 @@ Status FileRead(int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
   }
 
   if (*bytes_read == -1) {
-    return Status::IOError(std::string("Error reading bytes from file: ") +
-                           std::string(strerror(errno)));
+    return Status::IOError("Error reading bytes from file: ", ErrnoMessage(errno));
   }
 
   return Status::OK();
@@ -673,8 +741,7 @@ Status FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes,
   }
 
   if (*bytes_read == -1) {
-    return Status::IOError(std::string("Error reading bytes from file: ") +
-                           std::string(strerror(errno)));
+    return Status::IOError("Error reading bytes from file: ", ErrnoMessage(errno));
   }
   return Status::OK();
 }
@@ -704,8 +771,7 @@ Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) {
   }
 
   if (ret == -1) {
-    return Status::IOError(std::string("Error writing bytes from file: ") +
-                           std::string(strerror(errno)));
+    return Status::IOError("Error writing bytes to file: ", ErrnoMessage(errno));
   }
   return Status::OK();
 }
@@ -722,8 +788,7 @@ Status FileTruncate(int fd, const int64_t size) {
 #endif
 
   if (ret == -1) {
-    return Status::IOError(std::string("Error truncating file: ") +
-                           std::string(strerror(errno_actual)));
+    return Status::IOError("Error writing bytes to file: ", ErrnoMessage(errno_actual));
   }
   return Status::OK();
 }
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index b883ec9..5d01f40 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -94,14 +94,17 @@ class ARROW_EXPORT StdinStream : public InputStream {
 
 namespace internal {
 
-class ARROW_EXPORT PlatformFilename {
- public:
+// NOTE: 8-bit path strings on Windows are encoded using UTF-8.
+// Using MBCS would fail encoding some paths.
+
 #if defined(_WIN32)
-  using NativePathString = std::wstring;
+using NativePathString = std::wstring;
 #else
-  using NativePathString = std::string;
+using NativePathString = std::string;
 #endif
 
+class ARROW_EXPORT PlatformFilename {
+ public:
   ~PlatformFilename();
   PlatformFilename();
   PlatformFilename(const PlatformFilename&);
@@ -126,6 +129,7 @@ class ARROW_EXPORT PlatformFilename {
 
   // Those functions need access to the embedded path object
   friend ARROW_EXPORT Status CreateDir(const PlatformFilename&, bool*);
+  friend ARROW_EXPORT Status CreateDirTree(const PlatformFilename&, bool*);
   friend ARROW_EXPORT Status DeleteDirTree(const PlatformFilename&, bool*);
   friend ARROW_EXPORT Status DeleteFile(const PlatformFilename&, bool*);
   friend ARROW_EXPORT Status FileExists(const PlatformFilename&, bool*);
@@ -134,6 +138,8 @@ class ARROW_EXPORT PlatformFilename {
 ARROW_EXPORT
 Status CreateDir(const PlatformFilename& dir_path, bool* created = NULLPTR);
 ARROW_EXPORT
+Status CreateDirTree(const PlatformFilename& dir_path, bool* created = NULLPTR);
+ARROW_EXPORT
 Status DeleteDirTree(const PlatformFilename& dir_path, bool* deleted = NULLPTR);
 ARROW_EXPORT
 Status DeleteFile(const PlatformFilename& file_path, bool* deleted = NULLPTR);
@@ -191,6 +197,13 @@ Status DelEnvVar(const char* name);
 ARROW_EXPORT
 Status DelEnvVar(const std::string& name);
 
+ARROW_EXPORT
+std::string ErrnoMessage(int errnum);
+#if _WIN32
+ARROW_EXPORT
+std::string WinErrorMessage(int errnum);
+#endif
+
 class ARROW_EXPORT TemporaryDir {
  public:
   ~TemporaryDir();