You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/30 17:12:30 UTC
[arrow] branch master updated: ARROW-5378: [C++] Local filesystem
implementation
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 30b4736 ARROW-5378: [C++] Local filesystem implementation
30b4736 is described below
commit 30b47360732ba483dd7fc818363587fea7972396
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu May 30 12:12:22 2019 -0500
ARROW-5378: [C++] Local filesystem implementation
Author: Antoine Pitrou <an...@python.org>
Closes #4379 from pitrou/ARROW-5378-local-filesystem and squashes the following commits:
c36e3dc8b <Antoine Pitrou> ARROW-5378: Local filesystem implementation
---
cpp/src/arrow/CMakeLists.txt | 2 +
cpp/src/arrow/filesystem/CMakeLists.txt | 1 +
cpp/src/arrow/filesystem/filesystem-test.cc | 305 +++++++++++++++++++++--
cpp/src/arrow/filesystem/filesystem.cc | 131 ++++++++++
cpp/src/arrow/filesystem/filesystem.h | 68 ++++-
cpp/src/arrow/filesystem/localfs-test.cc | 125 ++++++++++
cpp/src/arrow/filesystem/localfs.cc | 372 ++++++++++++++++++++++++++++
cpp/src/arrow/filesystem/localfs.h | 67 +++++
cpp/src/arrow/filesystem/mockfs.cc | 21 +-
cpp/src/arrow/filesystem/path-util.cc | 20 +-
cpp/src/arrow/filesystem/path-util.h | 7 +-
cpp/src/arrow/filesystem/test-util.cc | 143 ++++++-----
cpp/src/arrow/filesystem/test-util.h | 34 +++
cpp/src/arrow/filesystem/util-internal.cc | 45 ++++
cpp/src/arrow/filesystem/util-internal.h | 36 +++
cpp/src/arrow/io/file-test.cc | 8 +
cpp/src/arrow/io/file.cc | 21 +-
cpp/src/arrow/util/io-util-test.cc | 82 +++++-
cpp/src/arrow/util/io-util.cc | 143 ++++++++---
cpp/src/arrow/util/io-util.h | 21 +-
20 files changed, 1497 insertions(+), 155 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 606041a..c182ddb 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -106,8 +106,10 @@ set(ARROW_SRCS
csv/parser.cc
csv/reader.cc
filesystem/filesystem.cc
+ filesystem/localfs.cc
filesystem/mockfs.cc
filesystem/path-util.cc
+ filesystem/util-internal.cc
json/options.cc
json/chunked-builder.cc
json/chunker.cc
diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt
index da5e286..ea3d037 100644
--- a/cpp/src/arrow/filesystem/CMakeLists.txt
+++ b/cpp/src/arrow/filesystem/CMakeLists.txt
@@ -19,3 +19,4 @@
arrow_install_all_headers("arrow/filesystem")
add_arrow_test(filesystem-test)
+add_arrow_test(localfs-test)
diff --git a/cpp/src/arrow/filesystem/filesystem-test.cc b/cpp/src/arrow/filesystem/filesystem-test.cc
index 4f72e02..fa52f3c 100644
--- a/cpp/src/arrow/filesystem/filesystem-test.cc
+++ b/cpp/src/arrow/filesystem/filesystem-test.cc
@@ -118,6 +118,13 @@ TEST(PathUtil, ConcatAbstractPath) {
ASSERT_EQ("abc", ConcatAbstractPath("", "abc"));
ASSERT_EQ("abc/def", ConcatAbstractPath("abc", "def"));
ASSERT_EQ("abc/def/ghi", ConcatAbstractPath("abc/def", "ghi"));
+
+ ASSERT_EQ("abc/def", ConcatAbstractPath("abc/", "def"));
+ ASSERT_EQ("abc/def/ghi", ConcatAbstractPath("abc/def/", "ghi"));
+
+ ASSERT_EQ("/abc", ConcatAbstractPath("/", "abc"));
+ ASSERT_EQ("/abc/def", ConcatAbstractPath("/abc", "def"));
+ ASSERT_EQ("/abc/def", ConcatAbstractPath("/abc/", "def"));
}
TEST(PathUtil, JoinAbstractPath) {
@@ -128,6 +135,15 @@ TEST(PathUtil, JoinAbstractPath) {
ASSERT_EQ("", JoinAbstractPath(parts.begin(), parts.begin()));
}
+TEST(PathUtil, EnsureTrailingSlash) {
+ ASSERT_EQ("", EnsureTrailingSlash(""));
+ ASSERT_EQ("/", EnsureTrailingSlash("/"));
+ ASSERT_EQ("abc/", EnsureTrailingSlash("abc"));
+ ASSERT_EQ("abc/", EnsureTrailingSlash("abc/"));
+ ASSERT_EQ("/abc/", EnsureTrailingSlash("/abc"));
+ ASSERT_EQ("/abc/", EnsureTrailingSlash("/abc/"));
+}
+
////////////////////////////////////////////////////////////////////////////
// Generic MockFileSystem tests
@@ -179,10 +195,7 @@ class TestMockFS : public ::testing::Test {
}
void CreateFile(const std::string& path, const std::string& data) {
- std::shared_ptr<io::OutputStream> stream;
- ASSERT_OK(fs_->OpenAppendStream(path, &stream));
- ASSERT_OK(WriteString(stream.get(), data));
- ASSERT_OK(stream->Close());
+ ::arrow::fs::CreateFile(fs_.get(), path, data);
}
protected:
@@ -190,23 +203,6 @@ class TestMockFS : public ::testing::Test {
std::shared_ptr<MockFileSystem> fs_;
};
-void AssertFileStats(const FileStats& st, const std::string& path, FileType type) {
- ASSERT_EQ(st.path(), path);
- ASSERT_EQ(st.type(), type);
-}
-
-void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
- TimePoint mtime) {
- AssertFileStats(st, path, type);
- ASSERT_EQ(st.mtime(), mtime);
-}
-
-void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
- TimePoint mtime, int64_t size) {
- AssertFileStats(st, path, type, mtime);
- ASSERT_EQ(st.size(), size);
-}
-
TEST_F(TestMockFS, Empty) {
CheckDirs({});
CheckFiles({});
@@ -317,6 +313,10 @@ TEST_F(TestMockFS, GetTargetStatsSelector) {
AssertFileStats(stats[0], "AB", FileType::Directory, time_);
AssertFileStats(stats[1], "AB/CD", FileType::Directory, time_);
AssertFileStats(stats[2], "ab", FileType::File, time_, 4);
+
+ // Invalid path
+ s.base_dir = "//foo//bar//baz//";
+ ASSERT_RAISES(Invalid, fs_->GetTargetStats(s, &stats));
}
TEST_F(TestMockFS, OpenOutputStream) {
@@ -342,6 +342,269 @@ TEST_F(TestMockFS, OpenAppendStream) {
CheckFiles({{"ab", time_, "some data"}});
}
+////////////////////////////////////////////////////////////////////////////
+// Concrete SubTreeFileSystem tests
+
+class TestSubTreeFileSystem : public TestMockFS {
+ public:
+ void SetUp() override {
+ TestMockFS::SetUp();
+ ASSERT_OK(fs_->CreateDir("sub/tree"));
+ subfs_ = std::make_shared<SubTreeFileSystem>("sub/tree", fs_);
+ }
+
+ void CreateFile(const std::string& path, const std::string& data) {
+ ::arrow::fs::CreateFile(subfs_.get(), path, data);
+ }
+
+ protected:
+ std::shared_ptr<SubTreeFileSystem> subfs_;
+};
+
+TEST_F(TestSubTreeFileSystem, CreateDir) {
+ ASSERT_OK(subfs_->CreateDir("AB"));
+ ASSERT_OK(subfs_->CreateDir("AB/CD/EF")); // Recursive
+ // Non-recursive, parent doesn't exist
+ ASSERT_RAISES(IOError, subfs_->CreateDir("AB/GH/IJ", false /* recursive */));
+ ASSERT_OK(subfs_->CreateDir("AB/GH", false /* recursive */));
+ ASSERT_OK(subfs_->CreateDir("AB/GH/IJ", false /* recursive */));
+ // Can't create root dir
+ ASSERT_RAISES(IOError, subfs_->CreateDir(""));
+ CheckDirs({{"sub", time_},
+ {"sub/tree", time_},
+ {"sub/tree/AB", time_},
+ {"sub/tree/AB/CD", time_},
+ {"sub/tree/AB/CD/EF", time_},
+ {"sub/tree/AB/GH", time_},
+ {"sub/tree/AB/GH/IJ", time_}});
+ CheckFiles({});
+}
+
+TEST_F(TestSubTreeFileSystem, DeleteDir) {
+ ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));
+ ASSERT_OK(subfs_->CreateDir("AB/GH/IJ"));
+ ASSERT_OK(subfs_->DeleteDir("AB/CD"));
+ ASSERT_OK(subfs_->DeleteDir("AB/GH/IJ"));
+ CheckDirs({{"sub", time_},
+ {"sub/tree", time_},
+ {"sub/tree/AB", time_},
+ {"sub/tree/AB/GH", time_}});
+ CheckFiles({});
+ ASSERT_RAISES(IOError, subfs_->DeleteDir("AB/CD"));
+ ASSERT_OK(subfs_->DeleteDir("AB"));
+ CheckDirs({{"sub", time_}, {"sub/tree", time_}});
+ CheckFiles({});
+
+ // Can't delete root dir
+ ASSERT_RAISES(IOError, subfs_->DeleteDir(""));
+ CheckDirs({{"sub", time_}, {"sub/tree", time_}});
+ CheckFiles({});
+}
+
+TEST_F(TestSubTreeFileSystem, DeleteFile) {
+ ASSERT_OK(subfs_->CreateDir("AB"));
+
+ CreateFile("ab", "");
+ CheckFiles({{"sub/tree/ab", time_, ""}});
+ ASSERT_OK(subfs_->DeleteFile("ab"));
+ CheckFiles({});
+
+ CreateFile("AB/cd", "");
+ CheckFiles({{"sub/tree/AB/cd", time_, ""}});
+ ASSERT_OK(subfs_->DeleteFile("AB/cd"));
+ CheckFiles({});
+
+ ASSERT_RAISES(IOError, subfs_->DeleteFile("non-existent"));
+ ASSERT_RAISES(IOError, subfs_->DeleteFile(""));
+}
+
+TEST_F(TestSubTreeFileSystem, MoveFile) {
+ CreateFile("ab", "");
+ CheckFiles({{"sub/tree/ab", time_, ""}});
+ ASSERT_OK(subfs_->Move("ab", "cd"));
+ CheckFiles({{"sub/tree/cd", time_, ""}});
+
+ ASSERT_OK(subfs_->CreateDir("AB"));
+ ASSERT_OK(subfs_->Move("cd", "AB/ef"));
+ CheckFiles({{"sub/tree/AB/ef", time_, ""}});
+
+ ASSERT_RAISES(IOError, subfs_->Move("AB/ef", ""));
+ ASSERT_RAISES(IOError, subfs_->Move("", "xxx"));
+ CheckFiles({{"sub/tree/AB/ef", time_, ""}});
+ CheckDirs({{"sub", time_}, {"sub/tree", time_}, {"sub/tree/AB", time_}});
+}
+
+TEST_F(TestSubTreeFileSystem, MoveDir) {
+ ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));
+ ASSERT_OK(subfs_->Move("AB/CD", "GH"));
+ CheckDirs({{"sub", time_},
+ {"sub/tree", time_},
+ {"sub/tree/AB", time_},
+ {"sub/tree/GH", time_},
+ {"sub/tree/GH/EF", time_}});
+
+ ASSERT_RAISES(IOError, subfs_->Move("AB", ""));
+}
+
+TEST_F(TestSubTreeFileSystem, CopyFile) {
+ CreateFile("ab", "data");
+ CheckFiles({{"sub/tree/ab", time_, "data"}});
+ ASSERT_OK(subfs_->CopyFile("ab", "cd"));
+ CheckFiles({{"sub/tree/ab", time_, "data"}, {"sub/tree/cd", time_, "data"}});
+
+ ASSERT_OK(subfs_->CreateDir("AB"));
+ ASSERT_OK(subfs_->CopyFile("cd", "AB/ef"));
+ CheckFiles({{"sub/tree/AB/ef", time_, "data"},
+ {"sub/tree/ab", time_, "data"},
+ {"sub/tree/cd", time_, "data"}});
+
+ ASSERT_RAISES(IOError, subfs_->CopyFile("ab", ""));
+ ASSERT_RAISES(IOError, subfs_->CopyFile("", "xxx"));
+ CheckFiles({{"sub/tree/AB/ef", time_, "data"},
+ {"sub/tree/ab", time_, "data"},
+ {"sub/tree/cd", time_, "data"}});
+}
+
+TEST_F(TestSubTreeFileSystem, OpenInputStream) {
+ std::shared_ptr<io::InputStream> stream;
+ std::shared_ptr<Buffer> buffer;
+ CreateFile("ab", "data");
+
+ ASSERT_OK(subfs_->OpenInputStream("ab", &stream));
+ ASSERT_OK(stream->Read(4, &buffer));
+ AssertBufferEqual(*buffer, "data");
+ ASSERT_OK(stream->Close());
+
+ ASSERT_RAISES(IOError, subfs_->OpenInputStream("non-existent", &stream));
+ ASSERT_RAISES(IOError, subfs_->OpenInputStream("", &stream));
+}
+
+TEST_F(TestSubTreeFileSystem, OpenInputFile) {
+ std::shared_ptr<io::RandomAccessFile> stream;
+ std::shared_ptr<Buffer> buffer;
+ CreateFile("ab", "some data");
+
+ ASSERT_OK(subfs_->OpenInputFile("ab", &stream));
+ ASSERT_OK(stream->ReadAt(5, 4, &buffer));
+ AssertBufferEqual(*buffer, "data");
+ ASSERT_OK(stream->Close());
+
+ ASSERT_RAISES(IOError, subfs_->OpenInputFile("non-existent", &stream));
+ ASSERT_RAISES(IOError, subfs_->OpenInputFile("", &stream));
+}
+
+TEST_F(TestSubTreeFileSystem, OpenOutputStream) {
+ std::shared_ptr<io::OutputStream> stream;
+
+ ASSERT_OK(subfs_->OpenOutputStream("ab", &stream));
+ ASSERT_OK(stream->Write("data"));
+ ASSERT_OK(stream->Close());
+ CheckFiles({{"sub/tree/ab", time_, "data"}});
+
+ ASSERT_OK(subfs_->CreateDir("AB"));
+ ASSERT_OK(subfs_->OpenOutputStream("AB/cd", &stream));
+ ASSERT_OK(stream->Write("other"));
+ ASSERT_OK(stream->Close());
+ CheckFiles({{"sub/tree/AB/cd", time_, "other"}, {"sub/tree/ab", time_, "data"}});
+
+ ASSERT_RAISES(IOError, subfs_->OpenOutputStream("non-existent/xxx", &stream));
+ ASSERT_RAISES(IOError, subfs_->OpenOutputStream("AB", &stream));
+ ASSERT_RAISES(IOError, subfs_->OpenOutputStream("", &stream));
+ CheckFiles({{"sub/tree/AB/cd", time_, "other"}, {"sub/tree/ab", time_, "data"}});
+}
+
+TEST_F(TestSubTreeFileSystem, OpenAppendStream) {
+ std::shared_ptr<io::OutputStream> stream;
+
+ ASSERT_OK(subfs_->OpenAppendStream("ab", &stream));
+ ASSERT_OK(stream->Write("some"));
+ ASSERT_OK(stream->Close());
+ CheckFiles({{"sub/tree/ab", time_, "some"}});
+
+ ASSERT_OK(subfs_->OpenAppendStream("ab", &stream));
+ ASSERT_OK(stream->Write(" data"));
+ ASSERT_OK(stream->Close());
+ CheckFiles({{"sub/tree/ab", time_, "some data"}});
+}
+
+TEST_F(TestSubTreeFileSystem, GetTargetStatsSingle) {
+ FileStats st;
+ ASSERT_OK(subfs_->CreateDir("AB/CD"));
+
+ ASSERT_OK(subfs_->GetTargetStats("AB", &st));
+ AssertFileStats(st, "AB", FileType::Directory, time_);
+ ASSERT_OK(subfs_->GetTargetStats("AB/CD", &st));
+ AssertFileStats(st, "AB/CD", FileType::Directory, time_);
+
+ CreateFile("ab", "data");
+ ASSERT_OK(subfs_->GetTargetStats("ab", &st));
+ AssertFileStats(st, "ab", FileType::File, time_, 4);
+
+ ASSERT_OK(subfs_->GetTargetStats("non-existent", &st));
+ AssertFileStats(st, "non-existent", FileType::NonExistent);
+}
+
+TEST_F(TestSubTreeFileSystem, GetTargetStatsVector) {
+ std::vector<FileStats> stats;
+
+ ASSERT_OK(subfs_->CreateDir("AB/CD"));
+ CreateFile("ab", "data");
+ CreateFile("AB/cd", "other data");
+
+ ASSERT_OK(subfs_->GetTargetStats({"ab", "AB", "AB/cd", "non-existent"}, &stats));
+ ASSERT_EQ(stats.size(), 4);
+ AssertFileStats(stats[0], "ab", FileType::File, time_, 4);
+ AssertFileStats(stats[1], "AB", FileType::Directory, time_);
+ AssertFileStats(stats[2], "AB/cd", FileType::File, time_, 10);
+ AssertFileStats(stats[3], "non-existent", FileType::NonExistent);
+}
+
+TEST_F(TestSubTreeFileSystem, GetTargetStatsSelector) {
+ std::vector<FileStats> stats;
+ Selector selector;
+
+ ASSERT_OK(subfs_->CreateDir("AB/CD"));
+ CreateFile("ab", "data");
+ CreateFile("AB/cd", "data2");
+ CreateFile("AB/CD/ef", "data34");
+
+ selector.base_dir = "AB";
+ selector.recursive = false;
+ ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+ ASSERT_EQ(stats.size(), 2);
+ AssertFileStats(stats[0], "AB/CD", FileType::Directory, time_);
+ AssertFileStats(stats[1], "AB/cd", FileType::File, time_, 5);
+
+ selector.recursive = true;
+ ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+ ASSERT_EQ(stats.size(), 3);
+ AssertFileStats(stats[0], "AB/CD", FileType::Directory, time_);
+ AssertFileStats(stats[1], "AB/CD/ef", FileType::File, time_, 6);
+ AssertFileStats(stats[2], "AB/cd", FileType::File, time_, 5);
+
+ selector.base_dir = "";
+ selector.recursive = false;
+ ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+ ASSERT_EQ(stats.size(), 2);
+ AssertFileStats(stats[0], "AB", FileType::Directory, time_);
+ AssertFileStats(stats[1], "ab", FileType::File, time_, 4);
+
+ selector.recursive = true;
+ ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+ ASSERT_EQ(stats.size(), 5);
+ AssertFileStats(stats[0], "AB", FileType::Directory, time_);
+ AssertFileStats(stats[1], "AB/CD", FileType::Directory, time_);
+ AssertFileStats(stats[2], "AB/CD/ef", FileType::File, time_, 6);
+ AssertFileStats(stats[3], "AB/cd", FileType::File, time_, 5);
+ AssertFileStats(stats[4], "ab", FileType::File, time_, 4);
+
+ selector.base_dir = "non-existent";
+ ASSERT_RAISES(IOError, subfs_->GetTargetStats(selector, &stats));
+ selector.allow_non_existent = true;
+ ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
+ ASSERT_EQ(stats.size(), 0);
+}
+
} // namespace internal
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc
index 0ca9dec..1e35ccd 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -24,6 +24,11 @@
namespace arrow {
namespace fs {
+using internal::ConcatAbstractPath;
+using internal::EnsureTrailingSlash;
+using internal::GetAbstractPathParent;
+using internal::kSep;
+
std::string ToString(FileType ftype) {
switch (ftype) {
case FileType::NonExistent:
@@ -70,5 +75,131 @@ Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) {
return st;
}
+//////////////////////////////////////////////////////////////////////////
+// SubTreeFileSystem implementation
+
+// FIXME EnsureTrailingSlash works on abstract paths... but we will be
+// passing a concrete path, e.g. "C:" on Windows.
+
+SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path,
+ std::shared_ptr<FileSystem> base_fs)
+ : base_path_(EnsureTrailingSlash(base_path)), base_fs_(base_fs) {}
+
+SubTreeFileSystem::~SubTreeFileSystem() {}
+
+std::string SubTreeFileSystem::PrependBase(const std::string& s) const {
+ if (s.empty()) {
+ return base_path_;
+ } else {
+ return ConcatAbstractPath(base_path_, s);
+ }
+}
+
+Status SubTreeFileSystem::PrependBaseNonEmpty(std::string* s) const {
+ if (s->empty()) {
+ return Status::IOError("Empty path");
+ } else {
+ *s = ConcatAbstractPath(base_path_, *s);
+ return Status::OK();
+ }
+}
+
+Status SubTreeFileSystem::StripBase(const std::string& s, std::string* out) const {
+ auto len = base_path_.length();
+ // Note base_path_ ends with a slash (if not empty)
+ if (s.length() >= len && s.substr(0, len) == base_path_) {
+ *out = s.substr(len);
+ return Status::OK();
+ } else {
+ return Status::UnknownError("Underlying filesystem returned path '", s,
+ "', which is not a subpath of '", base_path_, "'");
+ }
+}
+
+Status SubTreeFileSystem::FixStats(FileStats* st) const {
+ std::string fixed_path;
+ RETURN_NOT_OK(StripBase(st->path(), &fixed_path));
+ st->set_path(fixed_path);
+ return Status::OK();
+}
+
+Status SubTreeFileSystem::GetTargetStats(const std::string& path, FileStats* out) {
+ RETURN_NOT_OK(base_fs_->GetTargetStats(PrependBase(path), out));
+ return FixStats(out);
+}
+
+Status SubTreeFileSystem::GetTargetStats(const Selector& select,
+ std::vector<FileStats>* out) {
+ auto selector = select;
+ selector.base_dir = PrependBase(selector.base_dir);
+ RETURN_NOT_OK(base_fs_->GetTargetStats(selector, out));
+ for (auto& st : *out) {
+ RETURN_NOT_OK(FixStats(&st));
+ }
+ return Status::OK();
+}
+
+Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->CreateDir(s, recursive);
+}
+
+Status SubTreeFileSystem::DeleteDir(const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->DeleteDir(s);
+}
+
+Status SubTreeFileSystem::DeleteFile(const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->DeleteFile(s);
+}
+
+Status SubTreeFileSystem::Move(const std::string& src, const std::string& dest) {
+ auto s = src;
+ auto d = dest;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ RETURN_NOT_OK(PrependBaseNonEmpty(&d));
+ return base_fs_->Move(s, d);
+}
+
+Status SubTreeFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ auto s = src;
+ auto d = dest;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ RETURN_NOT_OK(PrependBaseNonEmpty(&d));
+ return base_fs_->CopyFile(s, d);
+}
+
+Status SubTreeFileSystem::OpenInputStream(const std::string& path,
+ std::shared_ptr<io::InputStream>* out) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputStream(s, out);
+}
+
+Status SubTreeFileSystem::OpenInputFile(const std::string& path,
+ std::shared_ptr<io::RandomAccessFile>* out) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputFile(s, out);
+}
+
+Status SubTreeFileSystem::OpenOutputStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenOutputStream(s, out);
+}
+
+Status SubTreeFileSystem::OpenAppendStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenAppendStream(s, out);
+}
+
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index e9783e7..9a3e5a0 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -27,6 +27,17 @@
#include "arrow/util/compression.h"
#include "arrow/util/visibility.h"
+// The Windows API defines macros from *File resolving to either
+// *FileA or *FileW. Need to undo them.
+#ifdef _WIN32
+#ifdef DeleteFile
+#undef DeleteFile
+#endif
+#ifdef CopyFile
+#undef CopyFile
+#endif
+#endif
+
namespace arrow {
namespace io {
@@ -134,6 +145,8 @@ class ARROW_EXPORT FileSystem {
virtual Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) = 0;
/// Create a directory and subdirectories.
+ ///
+ /// This function succeeds if the directory already exists.
virtual Status CreateDir(const std::string& path, bool recursive = true) = 0;
/// Delete a directory and its contents, recursively.
@@ -148,8 +161,10 @@ class ARROW_EXPORT FileSystem {
/// Move / rename a file or directory.
///
- /// If the destination exists and is a directory, an error is returned.
- /// Otherwise, it is replaced.
+ /// If the destination exists:
+ /// - if it is a non-empty directory, an error is returned
+ /// - otherwise, if it has the same type as the source, it is replaced
+ /// - otherwise, behavior is unspecified (implementation-dependent).
virtual Status Move(const std::string& src, const std::string& dest) = 0;
/// Copy a file.
@@ -179,5 +194,54 @@ class ARROW_EXPORT FileSystem {
std::shared_ptr<io::OutputStream>* out) = 0;
};
+/// \brief EXPERIMENTAL: a FileSystem implementation that delegates to another
+/// implementation after prepending a fixed base path.
+///
+/// This is useful to expose a logical view of a subtree of a filesystem,
+/// for example a directory in a LocalFileSystem.
+/// This makes no security guarantee. For example, symlinks may allow to
+/// "escape" the subtree and access other parts of the underlying filesystem.
+class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
+ public:
+ explicit SubTreeFileSystem(const std::string& base_path,
+ std::shared_ptr<FileSystem> base_fs);
+ ~SubTreeFileSystem() override;
+
+ using FileSystem::GetTargetStats;
+ Status GetTargetStats(const std::string& path, FileStats* out) override;
+ Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) override;
+
+ Status CreateDir(const std::string& path, bool recursive = true) override;
+
+ Status DeleteDir(const std::string& path) override;
+
+ Status DeleteFile(const std::string& path) override;
+
+ Status Move(const std::string& src, const std::string& dest) override;
+
+ Status CopyFile(const std::string& src, const std::string& dest) override;
+
+ Status OpenInputStream(const std::string& path,
+ std::shared_ptr<io::InputStream>* out) override;
+
+ Status OpenInputFile(const std::string& path,
+ std::shared_ptr<io::RandomAccessFile>* out) override;
+
+ Status OpenOutputStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) override;
+
+ Status OpenAppendStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) override;
+
+ protected:
+ const std::string base_path_;
+ std::shared_ptr<FileSystem> base_fs_;
+
+ std::string PrependBase(const std::string& s) const;
+ Status PrependBaseNonEmpty(std::string* s) const;
+ Status StripBase(const std::string& s, std::string* out) const;
+ Status FixStats(FileStats* st) const;
+};
+
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs-test.cc b/cpp/src/arrow/filesystem/localfs-test.cc
new file mode 100644
index 0000000..ef36ea4
--- /dev/null
+++ b/cpp/src/arrow/filesystem/localfs-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/test-util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+using ::arrow::internal::TemporaryDir;
+
+TimePoint CurrentTimePoint() {
+ auto now = std::chrono::system_clock::now();
+ return TimePoint(
+ std::chrono::duration_cast<TimePoint::duration>(now.time_since_epoch()));
+}
+
+////////////////////////////////////////////////////////////////////////////
+// Generic LocalFileSystem tests
+
+class TestLocalFSGeneric : public ::testing::Test, public GenericFileSystemTest {
+ public:
+ void SetUp() override {
+ ASSERT_OK(TemporaryDir::Make("test-localfs-", &temp_dir_));
+ local_fs_ = std::make_shared<LocalFileSystem>();
+ fs_ = std::make_shared<SubTreeFileSystem>(temp_dir_->path().ToString(), local_fs_);
+ }
+
+ protected:
+ std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; }
+
+ std::unique_ptr<TemporaryDir> temp_dir_;
+ std::shared_ptr<LocalFileSystem> local_fs_;
+ std::shared_ptr<FileSystem> fs_;
+};
+
+GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGeneric);
+
+////////////////////////////////////////////////////////////////////////////
+// Concrete LocalFileSystem tests
+
+class TestLocalFS : public ::testing::Test {
+ public:
+ void SetUp() {
+ ASSERT_OK(TemporaryDir::Make("test-localfs-", &temp_dir_));
+ local_fs_ = std::make_shared<LocalFileSystem>();
+ fs_ = std::make_shared<SubTreeFileSystem>(temp_dir_->path().ToString(), local_fs_);
+ }
+
+ protected:
+ std::unique_ptr<TemporaryDir> temp_dir_;
+ std::shared_ptr<LocalFileSystem> local_fs_;
+ std::shared_ptr<FileSystem> fs_;
+};
+
+TEST_F(TestLocalFS, DirectoryMTime) {
+ TimePoint t1 = CurrentTimePoint();
+ ASSERT_OK(fs_->CreateDir("AB/CD/EF"));
+ TimePoint t2 = CurrentTimePoint();
+
+ std::vector<FileStats> stats;
+ ASSERT_OK(fs_->GetTargetStats({"AB", "AB/CD/EF"}, &stats));
+ ASSERT_EQ(stats.size(), 2);
+ AssertFileStats(stats[0], "AB", FileType::Directory);
+ AssertFileStats(stats[1], "AB/CD/EF", FileType::Directory);
+
+ // NOTE: creating AB/CD updates AB's modification time, but creating
+ // AB/CD/EF doesn't. So AB/CD/EF's modification time should always be
+ // the same as or after AB's modification time.
+ AssertDurationBetween(stats[1].mtime() - stats[0].mtime(), 0, kTimeSlack);
+ // Depending on filesystem time granularity, the recorded time could be
+ // before the system time when doing the modification.
+ AssertDurationBetween(stats[0].mtime() - t1, -kTimeSlack, kTimeSlack);
+ AssertDurationBetween(t2 - stats[1].mtime(), -kTimeSlack, kTimeSlack);
+}
+
+TEST_F(TestLocalFS, FileMTime) {
+ TimePoint t1 = CurrentTimePoint();
+ ASSERT_OK(fs_->CreateDir("AB/CD"));
+ CreateFile(fs_.get(), "AB/CD/ab", "data");
+ TimePoint t2 = CurrentTimePoint();
+
+ std::vector<FileStats> stats;
+ ASSERT_OK(fs_->GetTargetStats({"AB", "AB/CD/ab"}, &stats));
+ ASSERT_EQ(stats.size(), 2);
+ AssertFileStats(stats[0], "AB", FileType::Directory);
+ AssertFileStats(stats[1], "AB/CD/ab", FileType::File, 4);
+
+ AssertDurationBetween(stats[1].mtime() - stats[0].mtime(), 0, kTimeSlack);
+ AssertDurationBetween(stats[0].mtime() - t1, -kTimeSlack, kTimeSlack);
+ AssertDurationBetween(t2 - stats[1].mtime(), -kTimeSlack, kTimeSlack);
+}
+
+// TODO check UNC paths on Windows, e.g. "\\server\share\path\file" (networked)
+// or "\\?\C:\path\file" (extended-length paths)
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc
new file mode 100644
index 0000000..4ba07e7
--- /dev/null
+++ b/cpp/src/arrow/filesystem/localfs.cc
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstring>
+#include <utility>
+
+#ifdef _WIN32
+#include "arrow/util/windows_compatibility.h"
+#else
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#endif
+
+#include <boost/filesystem.hpp>
+
+#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/util-internal.h"
+#include "arrow/io/file.h"
+#include "arrow/util/io-util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace fs {
+
+namespace bfs = ::boost::filesystem;
+
+using ::arrow::internal::NativePathString;
+using ::arrow::internal::PlatformFilename;
+
+namespace {
+
+#define BOOST_FILESYSTEM_TRY try {
+#define BOOST_FILESYSTEM_CATCH \
+ } \
+ catch (bfs::filesystem_error & _err) { \
+ return ToStatus(_err); \
+ }
+
+// NOTE: catching filesystem_error gives more context than system::error_code
+// (it includes the file path(s) in the error message)
+
+Status ToStatus(const bfs::filesystem_error& err) { return Status::IOError(err.what()); }
+
+template <typename... Args>
+Status ErrnoToStatus(Args&&... args) {
+ auto err_string = ::arrow::internal::ErrnoMessage(errno);
+ return Status::IOError(std::forward<Args>(args)..., err_string);
+}
+
+#ifdef _WIN32
+
+std::string NativeToString(const NativePathString& ns) {
+ PlatformFilename fn(ns);
+ return fn.ToString();
+}
+
+template <typename... Args>
+Status WinErrorToStatus(Args&&... args) {
+ auto err_string = ::arrow::internal::WinErrorMessage(GetLastError());
+ return Status::IOError(std::forward<Args>(args)..., err_string);
+}
+
+TimePoint ToTimePoint(FILETIME ft) {
+ // Hundreds of nanoseconds between January 1, 1601 (UTC) and the Unix epoch.
+ static constexpr int64_t kFileTimeEpoch = 11644473600LL * 10000000;
+
+ int64_t hundreds = (static_cast<int64_t>(ft.dwHighDateTime) << 32) + ft.dwLowDateTime -
+ kFileTimeEpoch; // hundreds of ns since Unix epoch
+ std::chrono::nanoseconds ns_count(100 * hundreds);
+ return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
+}
+
+FileStats FileInformationToFileStat(const BY_HANDLE_FILE_INFORMATION& info) {
+ FileStats st;
+ if (info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
+ st.set_type(FileType::Directory);
+ st.set_size(kNoSize);
+ } else {
+ // Regular file
+ st.set_type(FileType::File);
+ st.set_size((static_cast<int64_t>(info.nFileSizeHigh) << 32) + info.nFileSizeLow);
+ }
+ st.set_mtime(ToTimePoint(info.ftLastWriteTime));
+ return st;
+}
+
+Status StatFile(const std::wstring& path, FileStats* out) {
+ HANDLE h;
+ std::string bytes_path = NativeToString(path);
+
+ /* Inspired by CPython, see Modules/posixmodule.c */
+ h = CreateFileW(path.c_str(), FILE_READ_ATTRIBUTES, /* desired access */
+ 0, /* share mode */
+ NULL, /* security attributes */
+ OPEN_EXISTING,
+ /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */
+ FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS, NULL);
+
+ if (h == INVALID_HANDLE_VALUE) {
+ DWORD err = GetLastError();
+ if (err == ERROR_FILE_NOT_FOUND || err == ERROR_PATH_NOT_FOUND) {
+ out->set_path(bytes_path);
+ out->set_type(FileType::NonExistent);
+ out->set_mtime(kNoTime);
+ out->set_size(kNoSize);
+ return Status::OK();
+ } else {
+ return WinErrorToStatus("Failed querying information for path '", bytes_path, "'");
+ }
+ }
+ BY_HANDLE_FILE_INFORMATION info;
+ if (!GetFileInformationByHandle(h, &info)) {
+ Status st =
+ WinErrorToStatus("Failed querying information for path '", bytes_path, "'");
+ CloseHandle(h);
+ return st;
+ }
+ CloseHandle(h);
+ *out = FileInformationToFileStat(info);
+ out->set_path(bytes_path);
+ return Status::OK();
+}
+
+#else // POSIX systems
+
+TimePoint ToTimePoint(const struct timespec& s) {
+ std::chrono::nanoseconds ns_count(static_cast<int64_t>(s.tv_sec) * 1000000000 +
+ static_cast<int64_t>(s.tv_nsec));
+ return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
+}
+
+FileStats StatToFileStat(const struct stat& s) {
+ FileStats st;
+ if (S_ISREG(s.st_mode)) {
+ st.set_type(FileType::File);
+ st.set_size(static_cast<int64_t>(s.st_size));
+ } else if (S_ISDIR(s.st_mode)) {
+ st.set_type(FileType::Directory);
+ st.set_size(kNoSize);
+ } else {
+ st.set_type(FileType::Unknown);
+ st.set_size(kNoSize);
+ }
+#ifdef __APPLE__
+ // macOS doesn't use the POSIX-compliant spelling
+ st.set_mtime(ToTimePoint(s.st_mtimespec));
+#else
+ st.set_mtime(ToTimePoint(s.st_mtim));
+#endif
+ return st;
+}
+
+Status StatFile(const std::string& path, FileStats* out) {
+ struct stat s;
+ int r = stat(path.c_str(), &s);
+ if (r == -1) {
+ if (errno == ENOENT || errno == ENOTDIR || errno == ELOOP) {
+ out->set_type(FileType::NonExistent);
+ out->set_mtime(kNoTime);
+ out->set_size(kNoSize);
+ } else {
+ return ErrnoToStatus("Failed stat()ing path '", path, "'");
+ }
+ } else {
+ *out = StatToFileStat(s);
+ }
+ out->set_path(path);
+ return Status::OK();
+}
+
+#endif
+
+Status StatSelector(const NativePathString& path, const Selector& select,
+ std::vector<FileStats>* out) {
+ bfs::path p(path);
+
+ if (select.allow_non_existent) {
+ bfs::file_status st;
+ BOOST_FILESYSTEM_TRY
+ st = bfs::status(p);
+ BOOST_FILESYSTEM_CATCH
+ if (st.type() == bfs::file_not_found) {
+ return Status::OK();
+ }
+ }
+
+ BOOST_FILESYSTEM_TRY
+ for (const auto& entry : bfs::directory_iterator(p)) {
+ FileStats st;
+ NativePathString ns = entry.path().native();
+ RETURN_NOT_OK(StatFile(ns, &st));
+ if (st.type() != FileType::NonExistent) {
+ out->push_back(std::move(st));
+ }
+ if (select.recursive && st.type() == FileType::Directory) {
+ RETURN_NOT_OK(StatSelector(ns, select, out));
+ }
+ }
+ BOOST_FILESYSTEM_CATCH
+
+ return Status::OK();
+}
+
+} // namespace
+
+LocalFileSystem::LocalFileSystem() {}
+
+LocalFileSystem::~LocalFileSystem() {}
+
+Status LocalFileSystem::GetTargetStats(const std::string& path, FileStats* out) {
+ PlatformFilename fn;
+ RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+ return StatFile(fn.ToNative(), out);
+}
+
+Status LocalFileSystem::GetTargetStats(const Selector& select,
+ std::vector<FileStats>* out) {
+ PlatformFilename fn;
+ RETURN_NOT_OK(PlatformFilename::FromString(select.base_dir, &fn));
+ out->clear();
+ return StatSelector(fn.ToNative(), select, out);
+}
+
+Status LocalFileSystem::CreateDir(const std::string& path, bool recursive) {
+ PlatformFilename fn;
+ RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+ if (recursive) {
+ return ::arrow::internal::CreateDirTree(fn);
+ } else {
+ return ::arrow::internal::CreateDir(fn);
+ }
+}
+
+Status LocalFileSystem::DeleteDir(const std::string& path) {
+ bool deleted = false;
+ PlatformFilename fn;
+ RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+ RETURN_NOT_OK(::arrow::internal::DeleteDirTree(fn, &deleted));
+ if (deleted) {
+ return Status::OK();
+ } else {
+ return Status::IOError("Directory does not exist: '", path, "'");
+ }
+}
+
+Status LocalFileSystem::DeleteFile(const std::string& path) {
+ bool deleted = false;
+ PlatformFilename fn;
+ RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+ RETURN_NOT_OK(::arrow::internal::DeleteFile(fn, &deleted));
+ if (deleted) {
+ return Status::OK();
+ } else {
+ return Status::IOError("File does not exist: '", path, "'");
+ }
+}
+
+Status LocalFileSystem::Move(const std::string& src, const std::string& dest) {
+ PlatformFilename sfn, dfn;
+ RETURN_NOT_OK(PlatformFilename::FromString(src, &sfn));
+ RETURN_NOT_OK(PlatformFilename::FromString(dest, &dfn));
+
+#ifdef _WIN32
+ if (!MoveFileExW(sfn.ToNative().c_str(), dfn.ToNative().c_str(),
+ MOVEFILE_REPLACE_EXISTING)) {
+ return WinErrorToStatus("Failed renaming '", sfn.ToString(), "' to '", dfn.ToString(),
+ "': ");
+ }
+#else
+ if (rename(sfn.ToNative().c_str(), dfn.ToNative().c_str()) == -1) {
+ return ErrnoToStatus("Failed renaming '", sfn.ToString(), "' to '", dfn.ToString(),
+ "': ");
+ }
+#endif
+ return Status::OK();
+}
+
+Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ PlatformFilename sfn, dfn;
+ RETURN_NOT_OK(PlatformFilename::FromString(src, &sfn));
+ RETURN_NOT_OK(PlatformFilename::FromString(dest, &dfn));
+ // XXX should we use fstat() to compare inodes?
+ if (sfn.ToNative() == dfn.ToNative()) {
+ return Status::OK();
+ }
+
+#ifdef _WIN32
+ if (!CopyFileW(sfn.ToNative().c_str(), dfn.ToNative().c_str(),
+ FALSE /* bFailIfExists */)) {
+ return WinErrorToStatus("Failed copying '", sfn.ToString(), "' to '", dfn.ToString(),
+ "': ");
+ }
+ return Status::OK();
+#else
+ std::shared_ptr<io::InputStream> is;
+ std::shared_ptr<io::OutputStream> os;
+ RETURN_NOT_OK(OpenInputStream(src, &is));
+ RETURN_NOT_OK(OpenOutputStream(dest, &os));
+ RETURN_NOT_OK(internal::CopyStream(is, os, 1024 * 1024 /* chunk_size */));
+ RETURN_NOT_OK(os->Close());
+ return is->Close();
+#endif
+}
+
+Status LocalFileSystem::OpenInputStream(const std::string& path,
+ std::shared_ptr<io::InputStream>* out) {
+ std::shared_ptr<io::ReadableFile> file;
+ RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
+ *out = std::move(file);
+ return Status::OK();
+}
+
+Status LocalFileSystem::OpenInputFile(const std::string& path,
+ std::shared_ptr<io::RandomAccessFile>* out) {
+ std::shared_ptr<io::ReadableFile> file;
+ RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
+ *out = std::move(file);
+ return Status::OK();
+}
+
+namespace {
+
+Status OpenOutputStreamGeneric(const std::string& path, bool truncate, bool append,
+ std::shared_ptr<io::OutputStream>* out) {
+ PlatformFilename fn;
+ int fd;
+ bool write_only = true;
+ RETURN_NOT_OK(PlatformFilename::FromString(path, &fn));
+ RETURN_NOT_OK(
+ ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append, &fd));
+ Status st = io::FileOutputStream::Open(fd, out);
+ if (!st.ok()) {
+ ARROW_UNUSED(::arrow::internal::FileClose(fd));
+ }
+ return st;
+}
+
+} // namespace
+
+Status LocalFileSystem::OpenOutputStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) {
+ bool truncate = true;
+ bool append = false;
+ return OpenOutputStreamGeneric(path, truncate, append, out);
+}
+
+Status LocalFileSystem::OpenAppendStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) {
+ bool truncate = false;
+ bool append = true;
+ return OpenOutputStreamGeneric(path, truncate, append, out);
+}
+
+} // namespace fs
+} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h
new file mode 100644
index 0000000..c720ac2
--- /dev/null
+++ b/cpp/src/arrow/filesystem/localfs.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/filesystem/filesystem.h"
+
+namespace arrow {
+namespace fs {
+
+/// \brief EXPERIMENTAL: a FileSystem implementation accessing files
+/// on the local machine.
+///
+/// Details such as symlinks are abstracted away (symlinks are always followed,
+/// except when deleting an entry).
+class ARROW_EXPORT LocalFileSystem : public FileSystem {
+ public:
+ LocalFileSystem();
+ ~LocalFileSystem() override;
+
+ using FileSystem::GetTargetStats;
+ Status GetTargetStats(const std::string& path, FileStats* out) override;
+ Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) override;
+
+ Status CreateDir(const std::string& path, bool recursive = true) override;
+
+ Status DeleteDir(const std::string& path) override;
+
+ Status DeleteFile(const std::string& path) override;
+
+ Status Move(const std::string& src, const std::string& dest) override;
+
+ Status CopyFile(const std::string& src, const std::string& dest) override;
+
+ Status OpenInputStream(const std::string& path,
+ std::shared_ptr<io::InputStream>* out) override;
+
+ Status OpenInputFile(const std::string& path,
+ std::shared_ptr<io::RandomAccessFile>* out) override;
+
+ Status OpenOutputStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) override;
+
+ Status OpenAppendStream(const std::string& path,
+ std::shared_ptr<io::OutputStream>* out) override;
+};
+
+} // namespace fs
+} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc
index 8b4ef2a..a5f936f 100644
--- a/cpp/src/arrow/filesystem/mockfs.cc
+++ b/cpp/src/arrow/filesystem/mockfs.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
#include <iterator>
#include <map>
#include <string>
@@ -529,9 +530,23 @@ Status MockFileSystem::Move(const std::string& src, const std::string& dest) {
if (op.src_entry == nullptr) {
return PathNotFound(src);
}
- if (op.dest_entry != nullptr && op.dest_entry->is_dir()) {
- return Status::IOError("Cannot replace destination '", dest,
- "', which is a directory");
+ if (op.dest_entry != nullptr) {
+ if (op.dest_entry->is_dir()) {
+ return Status::IOError("Cannot replace destination '", dest,
+ "', which is a directory");
+ }
+ if (op.dest_entry->is_file() && op.src_entry->is_dir()) {
+ return Status::IOError("Cannot replace destination '", dest,
+ "', which is a file, with directory '", src, "'");
+ }
+ }
+ if (op.src_parts.size() < op.dest_parts.size()) {
+ // Check if dest is a child of src
+ auto p =
+ std::mismatch(op.src_parts.begin(), op.src_parts.end(), op.dest_parts.begin());
+ if (p.first == op.src_parts.end()) {
+ return Status::IOError("Cannot move '", src, "' into child path '", dest, "'");
+ }
}
// Move original entry, fix its name
diff --git a/cpp/src/arrow/filesystem/path-util.cc b/cpp/src/arrow/filesystem/path-util.cc
index bbab3ab..c30e6b2 100644
--- a/cpp/src/arrow/filesystem/path-util.cc
+++ b/cpp/src/arrow/filesystem/path-util.cc
@@ -23,7 +23,7 @@ namespace arrow {
namespace fs {
namespace internal {
-static constexpr char kSep = '/';
+// XXX How does this encode Windows UNC paths?
std::vector<std::string> SplitAbstractPath(const std::string& path) {
std::vector<std::string> parts;
@@ -81,7 +81,23 @@ Status ValidateAbstractPathParts(const std::vector<std::string>& parts) {
std::string ConcatAbstractPath(const std::string& base, const std::string& stem) {
DCHECK(!stem.empty());
- return base.empty() ? stem : base + kSep + stem;
+ if (base.empty()) {
+ return stem;
+ } else if (base.back() == kSep) {
+ return base + stem;
+ } else {
+ return base + kSep + stem;
+ }
+}
+
+std::string EnsureTrailingSlash(const std::string& s) {
+ if (s.length() > 0 && s.back() != kSep) {
+ // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"...
+ // Unless the local filesystem always uses absolute paths
+ return s + kSep;
+ } else {
+ return s;
+ }
}
} // namespace internal
diff --git a/cpp/src/arrow/filesystem/path-util.h b/cpp/src/arrow/filesystem/path-util.h
index 85b81c2..444451d 100644
--- a/cpp/src/arrow/filesystem/path-util.h
+++ b/cpp/src/arrow/filesystem/path-util.h
@@ -27,6 +27,8 @@ namespace arrow {
namespace fs {
namespace internal {
+constexpr char kSep = '/';
+
// Computations on abstract paths (not local paths with system-dependent behaviour).
// Abstract paths are typically used in URIs.
@@ -47,13 +49,16 @@ Status ValidateAbstractPathParts(const std::vector<std::string>& parts);
ARROW_EXPORT
std::string ConcatAbstractPath(const std::string& base, const std::string& stem);
+ARROW_EXPORT
+std::string EnsureTrailingSlash(const std::string& s);
+
// Join the components of an abstract path.
template <class StringIt>
std::string JoinAbstractPath(StringIt it, StringIt end) {
std::string path;
for (; it != end; ++it) {
if (!path.empty()) {
- path += '/';
+ path += kSep;
}
path += *it;
}
diff --git a/cpp/src/arrow/filesystem/test-util.cc b/cpp/src/arrow/filesystem/test-util.cc
index d87fbc5..27fc69b 100644
--- a/cpp/src/arrow/filesystem/test-util.cc
+++ b/cpp/src/arrow/filesystem/test-util.cc
@@ -32,8 +32,6 @@ namespace fs {
namespace {
-static constexpr double kTimeSlack = 2.0; // In seconds
-
std::vector<FileStats> GetAllWithType(FileSystem* fs, FileType type) {
Selector selector;
selector.base_dir = "";
@@ -57,20 +55,12 @@ std::vector<FileStats> GetAllFiles(FileSystem* fs) {
return GetAllWithType(fs, FileType::File);
}
-// Sort of vector of FileStats by lexicographic path order
-void SortStats(std::vector<FileStats>* stats) {
- std::sort(stats->begin(), stats->end(),
- [](const FileStats& left, const FileStats& right) -> bool {
- return left.path() < right.path();
- });
-}
-
void AssertPaths(const std::vector<FileStats>& stats,
const std::vector<std::string>& expected_paths) {
auto sorted_stats = stats;
SortStats(&sorted_stats);
- std::vector<std::string> paths(stats.size());
- std::transform(stats.begin(), stats.end(), paths.begin(),
+ std::vector<std::string> paths(sorted_stats.size());
+ std::transform(sorted_stats.begin(), sorted_stats.end(), paths.begin(),
[&](const FileStats& st) { return st.path(); });
ASSERT_EQ(paths, expected_paths);
@@ -88,6 +78,30 @@ Status WriteString(io::OutputStream* stream, const std::string& s) {
return stream->Write(s.data(), static_cast<int64_t>(s.length()));
}
+void AssertFileContents(FileSystem* fs, const std::string& path,
+ const std::string& expected_data) {
+ FileStats st;
+ ASSERT_OK(fs->GetTargetStats(path, &st));
+ ASSERT_EQ(st.type(), FileType::File) << "For path '" << path << "'";
+ ASSERT_EQ(st.size(), static_cast<int64_t>(expected_data.length()))
+ << "For path '" << path << "'";
+
+ std::shared_ptr<io::InputStream> stream;
+ std::shared_ptr<Buffer> buffer, leftover;
+ ASSERT_OK(fs->OpenInputStream(path, &stream));
+ ASSERT_OK(stream->Read(st.size(), &buffer));
+ AssertBufferEqual(*buffer, expected_data);
+ // No data left in stream
+ ASSERT_OK(stream->Read(1, &leftover));
+ ASSERT_EQ(leftover->size(), 0);
+
+ ASSERT_OK(stream->Close());
+}
+
+void ValidateTimePoint(TimePoint tp) { ASSERT_GE(tp.time_since_epoch().count(), 0); }
+
+}; // namespace
+
void CreateFile(FileSystem* fs, const std::string& path, const std::string& data) {
std::shared_ptr<io::OutputStream> stream;
ASSERT_OK(fs->OpenOutputStream(path, &stream));
@@ -95,12 +109,31 @@ void CreateFile(FileSystem* fs, const std::string& path, const std::string& data
ASSERT_OK(stream->Close());
}
+void SortStats(std::vector<FileStats>* stats) {
+ std::sort(stats->begin(), stats->end(),
+ [](const FileStats& left, const FileStats& right) -> bool {
+ return left.path() < right.path();
+ });
+}
+
void AssertFileStats(const FileStats& st, const std::string& path, FileType type) {
ASSERT_EQ(st.path(), path);
ASSERT_EQ(st.type(), type);
}
void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+ TimePoint mtime) {
+ AssertFileStats(st, path, type);
+ ASSERT_EQ(st.mtime(), mtime);
+}
+
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+ TimePoint mtime, int64_t size) {
+ AssertFileStats(st, path, type, mtime);
+ ASSERT_EQ(st.size(), size);
+}
+
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
int64_t size) {
AssertFileStats(st, path, type);
ASSERT_EQ(st.size(), size);
@@ -113,36 +146,6 @@ void AssertFileStats(FileSystem* fs, const std::string& path, Args&&... args) {
AssertFileStats(st, path, std::forward<Args>(args)...);
}
-void AssertFileContents(FileSystem* fs, const std::string& path,
- const std::string& expected_data) {
- FileStats st;
- ASSERT_OK(fs->GetTargetStats(path, &st));
- ASSERT_EQ(st.type(), FileType::File);
- ASSERT_EQ(st.size(), static_cast<int64_t>(expected_data.length()));
-
- std::shared_ptr<io::InputStream> stream;
- std::shared_ptr<Buffer> buffer, leftover;
- ASSERT_OK(fs->OpenInputStream(path, &stream));
- ASSERT_OK(stream->Read(st.size(), &buffer));
- AssertBufferEqual(*buffer, expected_data);
- // No data left in stream
- ASSERT_OK(stream->Read(1, &leftover));
- ASSERT_EQ(leftover->size(), 0);
-
- ASSERT_OK(stream->Close());
-}
-
-void ValidateTimePoint(TimePoint tp) { ASSERT_GE(tp.time_since_epoch().count(), 0); }
-
-template <typename Duration>
-void AssertDurationBetween(Duration d, double min_secs, double max_secs) {
- auto seconds = std::chrono::duration_cast<std::chrono::duration<double>>(d);
- ASSERT_GE(seconds.count(), min_secs);
- ASSERT_LE(seconds.count(), max_secs);
-}
-
-}; // namespace
-
////////////////////////////////////////////////////////////////////////////
// GenericFileSystemTest implementation
@@ -351,31 +354,31 @@ void GenericFileSystemTest::TestMoveDir(FileSystem* fs) {
AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
AssertAllFiles(fs, {"EF/ghi", "KL/CD/def", "KL/abc"});
- // Destination is a file => clobber
- CreateFile(fs, "MN", "");
- ASSERT_OK(fs->Move("KL", "MN"));
- AssertAllDirs(fs, {"EF", "MN", "MN/CD"});
- AssertAllFiles(fs, {"EF/ghi", "MN/CD/def", "MN/abc"});
+ // Overwrite file with directory => untested (implementation-dependent)
// Identical source and destination: allowed to succeed or raise IOError,
// but should not lose data.
- Status st = fs->Move("MN", "MN");
+ Status st = fs->Move("KL", "KL");
if (!st.ok()) {
ASSERT_RAISES(IOError, st);
}
- AssertAllDirs(fs, {"EF", "MN", "MN/CD"});
- AssertAllFiles(fs, {"EF/ghi", "MN/CD/def", "MN/abc"});
+ AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
+ AssertAllFiles(fs, {"EF/ghi", "KL/CD/def", "KL/abc"});
- // Destination is a directory
- ASSERT_RAISES(IOError, fs->Move("MN", "EF"));
- AssertAllDirs(fs, {"EF", "MN", "MN/CD"});
- AssertAllFiles(fs, {"EF/ghi", "MN/CD/def", "MN/abc"});
+ // Destination is a non-empty directory
+ ASSERT_RAISES(IOError, fs->Move("KL", "EF"));
+ AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
+ AssertAllFiles(fs, {"EF/ghi", "KL/CD/def", "KL/abc"});
+
+ // Cannot move directory inside itself
+ ASSERT_RAISES(IOError, fs->Move("KL", "KL/ZZ"));
// (other errors tested in TestMoveFile)
- // File contents didn't change
- AssertFileContents(fs, "MN/abc", "abc data");
- AssertFileContents(fs, "MN/CD/def", "def data");
+ // Contents didn't change
+ AssertAllDirs(fs, {"EF", "KL", "KL/CD"});
+ AssertFileContents(fs, "KL/abc", "abc data");
+ AssertFileContents(fs, "KL/CD/def", "def data");
}
void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
@@ -417,7 +420,7 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
AssertFileContents(fs, "def", "other data");
- // Destination is a directory
+ // Destination is a non-empty directory
ASSERT_RAISES(IOError, fs->CopyFile("def", "AB"));
// Source doesn't exist
ASSERT_RAISES(IOError, fs->CopyFile("abc", "xxx"));
@@ -430,7 +433,7 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
}
void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) {
- ASSERT_OK(fs->CreateDir("AB/CD"));
+ ASSERT_OK(fs->CreateDir("AB/CD/EF"));
CreateFile(fs, "AB/CD/ghi", "some data");
FileStats st;
@@ -442,11 +445,12 @@ void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) {
first_time = st.mtime();
ValidateTimePoint(first_time);
- ASSERT_OK(fs->GetTargetStats("AB/CD", &st));
- AssertFileStats(st, "AB/CD", FileType::Directory);
- ASSERT_EQ(st.base_name(), "CD");
+ ASSERT_OK(fs->GetTargetStats("AB/CD/EF", &st));
+ AssertFileStats(st, "AB/CD/EF", FileType::Directory);
+ ASSERT_EQ(st.base_name(), "EF");
ASSERT_EQ(st.size(), kNoSize);
- // AB/CD was created a little after AB
+ // AB/CD's creation can impact AB's modification time, however, AB/CD/EF's
+ // creation doesn't, so AB/CD/EF's mtime should be after AB's.
AssertDurationBetween(st.mtime() - first_time, 0.0, kTimeSlack);
ASSERT_OK(fs->GetTargetStats("AB/CD/ghi", &st));
@@ -459,10 +463,6 @@ void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) {
ASSERT_EQ(st.base_name(), "zz");
ASSERT_EQ(st.size(), kNoSize);
ASSERT_EQ(st.mtime(), kNoTime);
-
- // Invalid path
- // XXX will this really be rejected by all filesystems?
- ASSERT_RAISES(Invalid, fs->GetTargetStats("//foo//bar//baz//", &st));
}
void GenericFileSystemTest::TestGetTargetStatsVector(FileSystem* fs) {
@@ -491,11 +491,6 @@ void GenericFileSystemTest::TestGetTargetStatsVector(FileSystem* fs) {
ASSERT_OK(fs->GetTargetStats("AB", &st));
AssertFileStats(st, "AB", FileType::Directory);
ASSERT_EQ(st.mtime(), first_time);
-
- // Invalid path
- // XXX will this really be rejected by all filesystems?
- ASSERT_RAISES(Invalid,
- fs->GetTargetStats({"AB", "AB/CD", "//foo//bar//baz//"}, &stats));
}
void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) {
@@ -603,6 +598,8 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {
AssertAllFiles(fs, {"CD/ghi", "abc"});
AssertFileContents(fs, "CD/ghi", "overwritten");
+ ASSERT_RAISES(Invalid, WriteString(stream.get(), "x")); // Stream is closed
+
// Cannot turn dir into file
ASSERT_RAISES(IOError, fs->OpenOutputStream("CD", &stream));
AssertAllDirs(fs, {"CD"});
@@ -632,6 +629,8 @@ void GenericFileSystemTest::TestOpenAppendStream(FileSystem* fs) {
AssertAllDirs(fs, {});
AssertAllFiles(fs, {"abc"});
AssertFileContents(fs, "abc", "some data appended");
+
+ ASSERT_RAISES(Invalid, WriteString(stream.get(), "x")); // Stream is closed
}
void GenericFileSystemTest::TestOpenInputStream(FileSystem* fs) {
diff --git a/cpp/src/arrow/filesystem/test-util.h b/cpp/src/arrow/filesystem/test-util.h
index 8230501..179b08c 100644
--- a/cpp/src/arrow/filesystem/test-util.h
+++ b/cpp/src/arrow/filesystem/test-util.h
@@ -17,13 +17,47 @@
#pragma once
+#include <chrono>
#include <memory>
+#include <string>
+#include <vector>
#include "arrow/filesystem/filesystem.h"
namespace arrow {
namespace fs {
+static constexpr double kTimeSlack = 2.0; // In seconds
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type);
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+ TimePoint mtime);
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+ TimePoint mtime, int64_t size);
+
+ARROW_EXPORT
+void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
+ int64_t size);
+
+ARROW_EXPORT
+void CreateFile(FileSystem* fs, const std::string& path, const std::string& data);
+
+// Sort of vector of FileStats by lexicographic path order
+ARROW_EXPORT
+void SortStats(std::vector<FileStats>* stats);
+
+template <typename Duration>
+void AssertDurationBetween(Duration d, double min_secs, double max_secs) {
+ auto seconds = std::chrono::duration_cast<std::chrono::duration<double>>(d);
+ ASSERT_GE(seconds.count(), min_secs);
+ ASSERT_LE(seconds.count(), max_secs);
+}
+
// Generic tests for FileSystem implementations.
// To use this class, subclass both from it and ::testing::Test,
// implement GetEmptyFileSystem(), and use GENERIC_FS_TEST_FUNCTIONS()
diff --git a/cpp/src/arrow/filesystem/util-internal.cc b/cpp/src/arrow/filesystem/util-internal.cc
new file mode 100644
index 0000000..d097f91
--- /dev/null
+++ b/cpp/src/arrow/filesystem/util-internal.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/filesystem/util-internal.h"
+#include "arrow/buffer.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+Status CopyStream(const std::shared_ptr<io::InputStream>& src,
+ const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size) {
+ std::shared_ptr<Buffer> chunk;
+ int64_t bytes_read;
+
+ RETURN_NOT_OK(AllocateBuffer(chunk_size, &chunk));
+ while (true) {
+ RETURN_NOT_OK(src->Read(chunk_size, &bytes_read, chunk->mutable_data()));
+ if (bytes_read == 0) {
+ // EOF
+ break;
+ }
+ RETURN_NOT_OK(dest->Write(chunk->data(), bytes_read));
+ }
+
+ return Status::OK();
+}
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/util-internal.h b/cpp/src/arrow/filesystem/util-internal.h
new file mode 100644
index 0000000..eabdad4
--- /dev/null
+++ b/cpp/src/arrow/filesystem/util-internal.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace fs {
+namespace internal {
+
+ARROW_EXPORT
+Status CopyStream(const std::shared_ptr<io::InputStream>& src,
+ const std::shared_ptr<io::OutputStream>& dest, int64_t chunk_size);
+
+} // namespace internal
+} // namespace fs
+} // namespace arrow
diff --git a/cpp/src/arrow/io/file-test.cc b/cpp/src/arrow/io/file-test.cc
index c548785..1f0336e 100644
--- a/cpp/src/arrow/io/file-test.cc
+++ b/cpp/src/arrow/io/file-test.cc
@@ -145,6 +145,7 @@ TEST_F(TestFileOutputStream, Close) {
ASSERT_OK(file_->Close());
ASSERT_TRUE(file_->closed());
ASSERT_TRUE(FileIsClosed(fd));
+ ASSERT_RAISES(Invalid, file_->Write(data, strlen(data)));
// Idempotent
ASSERT_OK(file_->Close());
@@ -158,6 +159,7 @@ TEST_F(TestFileOutputStream, Close) {
ASSERT_OK(stream_->Close());
ASSERT_TRUE(stream_->closed());
ASSERT_TRUE(FileIsClosed(fd));
+ ASSERT_RAISES(Invalid, stream_->Write(data, strlen(data)));
// Idempotent
ASSERT_OK(stream_->Close());
@@ -409,6 +411,9 @@ TEST_F(TestReadableFile, Read) {
ASSERT_OK(file_->Seek(1));
ASSERT_OK(file_->Read(size, &buf));
ASSERT_EQ(size - 1, buf->size());
+
+ ASSERT_OK(file_->Close());
+ ASSERT_RAISES(Invalid, file_->Read(1, &buf));
}
TEST_F(TestReadableFile, ReadAt) {
@@ -436,6 +441,9 @@ TEST_F(TestReadableFile, ReadAt) {
Buffer expected(reinterpret_cast<const uint8_t*>(test_data + 2), 5);
ASSERT_TRUE(buffer2->Equals(expected));
+
+ ASSERT_OK(file_->Close());
+ ASSERT_RAISES(Invalid, file_->ReadAt(0, 1, &buffer2));
}
TEST_F(TestReadableFile, NonExistentFile) {
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 7c347f1..e678ca2 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -113,35 +113,52 @@ class OSFile {
return Status::OK();
}
+ Status CheckClosed() const {
+ if (!is_open_) {
+ return Status::Invalid("Invalid operation on closed file");
+ }
+ return Status::OK();
+ }
+
Status Close() {
if (is_open_) {
// Even if closing fails, the fd will likely be closed (perhaps it's
// already closed).
is_open_ = false;
- RETURN_NOT_OK(internal::FileClose(fd_));
+ int fd = fd_;
+ fd_ = -1;
+ RETURN_NOT_OK(internal::FileClose(fd));
}
return Status::OK();
}
Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+ RETURN_NOT_OK(CheckClosed());
return internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
}
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+ RETURN_NOT_OK(CheckClosed());
return internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position, nbytes,
bytes_read);
}
Status Seek(int64_t pos) {
+ RETURN_NOT_OK(CheckClosed());
if (pos < 0) {
return Status::Invalid("Invalid position");
}
return internal::FileSeek(fd_, pos);
}
- Status Tell(int64_t* pos) const { return internal::FileTell(fd_, pos); }
+ Status Tell(int64_t* pos) const {
+ RETURN_NOT_OK(CheckClosed());
+ return internal::FileTell(fd_, pos);
+ }
Status Write(const void* data, int64_t length) {
+ RETURN_NOT_OK(CheckClosed());
+
std::lock_guard<std::mutex> guard(lock_);
if (length < 0) {
return Status::IOError("Length must be non-negative");
diff --git a/cpp/src/arrow/util/io-util-test.cc b/cpp/src/arrow/util/io-util-test.cc
index 0f4f6ca..68d3f30 100644
--- a/cpp/src/arrow/util/io-util-test.cc
+++ b/cpp/src/arrow/util/io-util-test.cc
@@ -35,6 +35,48 @@ void AssertNotExists(const PlatformFilename& path) {
ASSERT_FALSE(exists) << "Path '" << path.ToString() << "' exists";
}
+TEST(PlatformFilename, RoundtripAscii) {
+ PlatformFilename fn;
+ ASSERT_OK(PlatformFilename::FromString("a/b", &fn));
+ ASSERT_EQ(fn.ToString(), "a/b");
+#if _WIN32
+ ASSERT_EQ(fn.ToNative(), L"a\\b");
+#else
+ ASSERT_EQ(fn.ToNative(), "a/b");
+#endif
+}
+
+TEST(PlatformFilename, RoundtripUtf8) {
+ PlatformFilename fn;
+ ASSERT_OK(PlatformFilename::FromString("h\xc3\xa9h\xc3\xa9", &fn));
+ ASSERT_EQ(fn.ToString(), "h\xc3\xa9h\xc3\xa9");
+#if _WIN32
+ ASSERT_EQ(fn.ToNative(), L"h\u00e9h\u00e9");
+#else
+ ASSERT_EQ(fn.ToNative(), "h\xc3\xa9h\xc3\xa9");
+#endif
+}
+
+#if _WIN32
+TEST(PlatformFilename, Separators) {
+ PlatformFilename fn;
+ ASSERT_OK(PlatformFilename::FromString("C:/foo/bar", &fn));
+ ASSERT_EQ(fn.ToString(), "C:/foo/bar");
+ ASSERT_EQ(fn.ToNative(), L"C:\\foo\\bar");
+
+ ASSERT_OK(PlatformFilename::FromString("C:\\foo\\bar", &fn));
+ ASSERT_EQ(fn.ToString(), "C:/foo/bar");
+ ASSERT_EQ(fn.ToNative(), L"C:\\foo\\bar");
+}
+#endif
+
+TEST(PlatformFilename, Invalid) {
+ PlatformFilename fn;
+ std::string s = "foo";
+ s += '\x00';
+ ASSERT_RAISES(Invalid, PlatformFilename::FromString(s, &fn));
+}
+
TEST(CreateDirDeleteDir, Basics) {
const std::string BASE = "xxx-io-util-test-dir";
bool created, deleted;
@@ -74,15 +116,15 @@ TEST(CreateDirDeleteDir, Basics) {
}
TEST(TemporaryDir, Basics) {
- std::unique_ptr<TemporaryDir> dir;
+ std::unique_ptr<TemporaryDir> temp_dir;
PlatformFilename fn;
- ASSERT_OK(TemporaryDir::Make("some-prefix-", &dir));
- fn = dir->path();
+ ASSERT_OK(TemporaryDir::Make("some-prefix-", &temp_dir));
+ fn = temp_dir->path();
// Path has a trailing separator, for convenience
ASSERT_EQ(fn.ToString().back(), '/');
#if defined(_WIN32)
- ASSERT_EQ(fn.ToNative().back(), L'/');
+ ASSERT_EQ(fn.ToNative().back(), L'\\');
#else
ASSERT_EQ(fn.ToNative().back(), '/');
#endif
@@ -98,19 +140,41 @@ TEST(TemporaryDir, Basics) {
ASSERT_OK(CreateDir(child));
AssertExists(child);
- dir.reset();
+ temp_dir.reset();
AssertNotExists(fn);
AssertNotExists(child);
}
+TEST(CreateDirTree, Basics) {
+ std::unique_ptr<TemporaryDir> temp_dir;
+ PlatformFilename fn;
+ bool created;
+
+ ASSERT_OK(TemporaryDir::Make("io-util-test-", &temp_dir));
+
+ ASSERT_OK(temp_dir->path().Join("AB/CD", &fn));
+ ASSERT_OK(CreateDirTree(fn, &created));
+ ASSERT_TRUE(created);
+ ASSERT_OK(CreateDirTree(fn, &created));
+ ASSERT_FALSE(created);
+
+ ASSERT_OK(temp_dir->path().Join("AB", &fn));
+ ASSERT_OK(CreateDirTree(fn, &created));
+ ASSERT_FALSE(created);
+
+ ASSERT_OK(temp_dir->path().Join("EF", &fn));
+ ASSERT_OK(CreateDirTree(fn, &created));
+ ASSERT_TRUE(created);
+}
+
TEST(DeleteFile, Basics) {
- std::unique_ptr<TemporaryDir> dir;
+ std::unique_ptr<TemporaryDir> temp_dir;
PlatformFilename fn;
int fd;
bool deleted;
- ASSERT_OK(TemporaryDir::Make("io-util-test-", &dir));
- ASSERT_OK(dir->path().Join("test-file", &fn));
+ ASSERT_OK(TemporaryDir::Make("io-util-test-", &temp_dir));
+ ASSERT_OK(temp_dir->path().Join("test-file", &fn));
AssertNotExists(fn);
ASSERT_OK(FileOpenWritable(fn, true /* write_only */, true /* truncate */,
@@ -125,7 +189,7 @@ TEST(DeleteFile, Basics) {
AssertNotExists(fn);
// Cannot call DeleteFile on directory
- ASSERT_OK(dir->path().Join("test-dir", &fn));
+ ASSERT_OK(temp_dir->path().Join("test-temp_dir", &fn));
ASSERT_OK(CreateDir(fn));
AssertExists(fn);
ASSERT_RAISES(IOError, DeleteFile(fn));
diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
index cb1237c..8ff4361 100644
--- a/cpp/src/arrow/util/io-util.cc
+++ b/cpp/src/arrow/util/io-util.cc
@@ -30,6 +30,7 @@
#include <cstring>
#include <iostream>
#include <random>
+#include <sstream>
#include <string>
#include <utility>
@@ -73,11 +74,6 @@
#include <unistd.h>
#endif
-// POSIX systems do not have this
-#ifndef O_BINARY
-#define O_BINARY 0
-#endif
-
// define max read/write count
#if defined(_WIN32)
#define ARROW_MAX_IO_CHUNKSIZE INT32_MAX
@@ -185,6 +181,27 @@ namespace internal {
namespace bfs = ::boost::filesystem;
+namespace {
+
+#if _WIN32
+using NativePathCodeCvt = std::codecvt_utf8_utf16<wchar_t>;
+#endif
+
+Status StringToNative(const std::string& s, NativePathString* out) {
+#if _WIN32
+ try {
+ *out = std::wstring_convert<NativePathCodeCvt>{}.from_bytes(s);
+ } catch (std::range_error& e) {
+ return Status::Invalid(e.what());
+ }
+#else
+ *out = s;
+#endif
+ return Status::OK();
+}
+
+} // namespace
+
#define BOOST_FILESYSTEM_TRY try {
#define BOOST_FILESYSTEM_CATCH \
} \
@@ -212,11 +229,31 @@ static std::string MakeRandomName(int num_chars) {
return s;
}
+std::string ErrnoMessage(int errnum) { return std::strerror(errnum); }
+
+#if _WIN32
+std::string WinErrorMessage(int errnum) {
+ char buf[1024];
+ auto nchars = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, errnum, 0, buf, sizeof(buf), NULL);
+ if (nchars == 0) {
+ // Fallback
+ std::stringstream ss;
+ ss << "Windows error #" << errnum;
+ return ss.str();
+ }
+ return std::string(buf, nchars);
+}
+#endif
+
//
// PlatformFilename implementation
//
struct PlatformFilename::Impl {
+ Impl() = default;
+ explicit Impl(bfs::path p) : path(p.make_preferred()) {}
+
bfs::path path;
};
@@ -244,33 +281,25 @@ PlatformFilename& PlatformFilename::operator=(PlatformFilename&& other) {
return *this;
}
-PlatformFilename::PlatformFilename(const PlatformFilename::NativePathString& path)
+PlatformFilename::PlatformFilename(const NativePathString& path)
: PlatformFilename(Impl{path}) {}
-const PlatformFilename::NativePathString& PlatformFilename::ToNative() const {
+const NativePathString& PlatformFilename::ToNative() const {
return impl_->path.native();
}
-std::string PlatformFilename::ToString() const { return impl_->path.string(); }
-
-namespace {
-
-Status StringToNative(const std::string& s, PlatformFilename::NativePathString* out) {
-#if defined(_WIN32)
- try {
- *out = std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(s);
- } catch (std::range_error& e) {
- return Status::Invalid(e.what());
- }
+std::string PlatformFilename::ToString() const {
+#if _WIN32
+ return impl_->path.generic_string(NativePathCodeCvt());
#else
- *out = s;
+ return impl_->path.generic_string();
#endif
- return Status::OK();
}
-} // namespace
-
Status PlatformFilename::FromString(const std::string& file_name, PlatformFilename* out) {
+ if (file_name.find_first_of('\0') != std::string::npos) {
+ return Status::Invalid("Embedded NUL char in file name: '", file_name, "'");
+ }
NativePathString ns;
RETURN_NOT_OK(StringToNative(file_name, &ns));
*out = PlatformFilename(std::move(ns));
@@ -297,9 +326,26 @@ Status CreateDir(const PlatformFilename& dir_path, bool* created) {
return Status::OK();
}
+Status CreateDirTree(const PlatformFilename& dir_path, bool* created) {
+ bool res;
+ BOOST_FILESYSTEM_TRY
+ res = bfs::create_directories(dir_path.impl_->path);
+ BOOST_FILESYSTEM_CATCH
+ if (created) {
+ *created = res;
+ }
+ return Status::OK();
+}
+
Status DeleteDirTree(const PlatformFilename& dir_path, bool* deleted) {
BOOST_FILESYSTEM_TRY
- auto n_removed = bfs::remove_all(dir_path.impl_->path);
+ const auto& path = dir_path.impl_->path;
+ // XXX There is a race here.
+ auto st = bfs::symlink_status(path);
+ if (st.type() != bfs::file_not_found && st.type() != bfs::directory_file) {
+ return Status::IOError("Cannot delete non -directory '", path.string(), "'");
+ }
+ auto n_removed = bfs::remove_all(path);
if (deleted) {
*deleted = n_removed != 0;
}
@@ -317,7 +363,7 @@ Status DeleteFile(const PlatformFilename& file_path, bool* deleted) {
if (!bfs::is_directory(st)) {
res = bfs::remove(path);
} else {
- return Status::IOError("Cannot delete directory '", path.string());
+ return Status::IOError("Cannot delete directory '", path.string(), "'");
}
if (deleted) {
*deleted = res;
@@ -360,8 +406,8 @@ static inline Status CheckFileOpResult(int ret, int errno_actual,
const PlatformFilename& file_name,
const char* opname) {
if (ret == -1) {
- return Status::IOError("Failed to ", opname, " file: ", file_name.ToString(),
- " , error: ", std::strerror(errno_actual));
+ return Status::IOError("Failed to ", opname, " file '", file_name.ToString(),
+ "', error: ", ErrnoMessage(errno_actual));
}
return Status::OK();
}
@@ -373,8 +419,22 @@ Status FileOpenReadable(const PlatformFilename& file_name, int* fd) {
_O_RDONLY | _O_BINARY | _O_NOINHERIT, _SH_DENYNO, _S_IREAD);
ret = *fd;
#else
- ret = *fd = open(file_name.ToNative().c_str(), O_RDONLY | O_BINARY);
+ ret = *fd = open(file_name.ToNative().c_str(), O_RDONLY);
errno_actual = errno;
+
+ if (ret >= 0) {
+ // open(O_RDONLY) succeeds on directories, check for it
+ struct stat st;
+ ret = fstat(*fd, &st);
+ if (ret == -1) {
+ ARROW_UNUSED(FileClose(*fd));
+ // Will propagate error below
+ } else if (S_ISDIR(st.st_mode)) {
+ ARROW_UNUSED(FileClose(*fd));
+ return Status::IOError("Cannot open for reading: path '", file_name.ToString(),
+ "' is a directory");
+ }
+ }
#endif
return CheckFileOpResult(ret, errno_actual, file_name, "open local");
@@ -405,7 +465,7 @@ Status FileOpenWritable(const PlatformFilename& file_name, bool write_only, bool
ret = *fd;
#else
- int oflag = O_CREAT | O_BINARY;
+ int oflag = O_CREAT;
if (truncate) {
oflag |= O_TRUNC;
@@ -423,7 +483,16 @@ Status FileOpenWritable(const PlatformFilename& file_name, bool write_only, bool
ret = *fd = open(file_name.ToNative().c_str(), oflag, ARROW_WRITE_SHMODE);
errno_actual = errno;
#endif
- return CheckFileOpResult(ret, errno_actual, file_name, "open local");
+ RETURN_NOT_OK(CheckFileOpResult(ret, errno_actual, file_name, "open local"));
+ if (append) {
+ // Seek to end, as O_APPEND does not necessarily do it
+ auto ret = lseek64_compat(*fd, 0, SEEK_END);
+ if (ret == -1) {
+ ARROW_UNUSED(FileClose(*fd));
+ return Status::IOError("lseek failed");
+ }
+ }
+ return Status::OK();
}
Status FileTell(int fd, int64_t* pos) {
@@ -452,7 +521,7 @@ Status CreatePipe(int fd[2]) {
#endif
if (ret == -1) {
- return Status::IOError("Error creating pipe: ", std::strerror(errno));
+ return Status::IOError("Error creating pipe: ", ErrnoMessage(errno));
}
return Status::OK();
}
@@ -461,7 +530,7 @@ static Status StatusFromErrno(const char* prefix) {
#ifdef _WIN32
errno = __map_mman_error(GetLastError(), EPERM);
#endif
- return Status::IOError(prefix, std::strerror(errno));
+ return Status::IOError(prefix, ErrnoMessage(errno));
}
//
@@ -643,8 +712,7 @@ Status FileRead(int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
}
if (*bytes_read == -1) {
- return Status::IOError(std::string("Error reading bytes from file: ") +
- std::string(strerror(errno)));
+ return Status::IOError("Error reading bytes from file: ", ErrnoMessage(errno));
}
return Status::OK();
@@ -673,8 +741,7 @@ Status FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes,
}
if (*bytes_read == -1) {
- return Status::IOError(std::string("Error reading bytes from file: ") +
- std::string(strerror(errno)));
+ return Status::IOError("Error reading bytes from file: ", ErrnoMessage(errno));
}
return Status::OK();
}
@@ -704,8 +771,7 @@ Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) {
}
if (ret == -1) {
- return Status::IOError(std::string("Error writing bytes from file: ") +
- std::string(strerror(errno)));
+ return Status::IOError("Error writing bytes to file: ", ErrnoMessage(errno));
}
return Status::OK();
}
@@ -722,8 +788,7 @@ Status FileTruncate(int fd, const int64_t size) {
#endif
if (ret == -1) {
- return Status::IOError(std::string("Error truncating file: ") +
- std::string(strerror(errno_actual)));
+ return Status::IOError("Error writing bytes to file: ", ErrnoMessage(errno_actual));
}
return Status::OK();
}
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index b883ec9..5d01f40 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -94,14 +94,17 @@ class ARROW_EXPORT StdinStream : public InputStream {
namespace internal {
-class ARROW_EXPORT PlatformFilename {
- public:
+// NOTE: 8-bit path strings on Windows are encoded using UTF-8.
+// Using MBCS would fail encoding some paths.
+
#if defined(_WIN32)
- using NativePathString = std::wstring;
+using NativePathString = std::wstring;
#else
- using NativePathString = std::string;
+using NativePathString = std::string;
#endif
+class ARROW_EXPORT PlatformFilename {
+ public:
~PlatformFilename();
PlatformFilename();
PlatformFilename(const PlatformFilename&);
@@ -126,6 +129,7 @@ class ARROW_EXPORT PlatformFilename {
// Those functions need access to the embedded path object
friend ARROW_EXPORT Status CreateDir(const PlatformFilename&, bool*);
+ friend ARROW_EXPORT Status CreateDirTree(const PlatformFilename&, bool*);
friend ARROW_EXPORT Status DeleteDirTree(const PlatformFilename&, bool*);
friend ARROW_EXPORT Status DeleteFile(const PlatformFilename&, bool*);
friend ARROW_EXPORT Status FileExists(const PlatformFilename&, bool*);
@@ -134,6 +138,8 @@ class ARROW_EXPORT PlatformFilename {
ARROW_EXPORT
Status CreateDir(const PlatformFilename& dir_path, bool* created = NULLPTR);
ARROW_EXPORT
+Status CreateDirTree(const PlatformFilename& dir_path, bool* created = NULLPTR);
+ARROW_EXPORT
Status DeleteDirTree(const PlatformFilename& dir_path, bool* deleted = NULLPTR);
ARROW_EXPORT
Status DeleteFile(const PlatformFilename& file_path, bool* deleted = NULLPTR);
@@ -191,6 +197,13 @@ Status DelEnvVar(const char* name);
ARROW_EXPORT
Status DelEnvVar(const std::string& name);
+ARROW_EXPORT
+std::string ErrnoMessage(int errnum);
+#if _WIN32
+ARROW_EXPORT
+std::string WinErrorMessage(int errnum);
+#endif
+
class ARROW_EXPORT TemporaryDir {
public:
~TemporaryDir();