You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/05/10 16:11:06 UTC
[arrow] branch master updated: ARROW-15587: [C++] Add support for all options specified by substrait::ReadRel::LocalFiles::FileOrFiles
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new dc23c67b7f ARROW-15587: [C++] Add support for all options specified by substrait::ReadRel::LocalFiles::FileOrFiles
dc23c67b7f is described below
commit dc23c67b7f3d15d54eba860650b74f969fb4f06c
Author: Ariana Villegas <ar...@utec.edu.pe>
AuthorDate: Tue May 10 18:10:57 2022 +0200
ARROW-15587: [C++] Add support for all options specified by substrait::ReadRel::LocalFiles::FileOrFiles
The Substrait read operator defines files with LocalFiles::FileOrFiles. These elements can take one of several forms:
- uri_path (can be a file or a folder)
- uri_path_glob (a glob expression)
- uri_file (file only)
- uri_folder (folder only)
The C++ Substrait consumer currently only supports uri_file. This PR adds support for the other options.
- [x] uri_path (can be a file or a folder)
- [x] uri_path_glob (a glob expression)
- [x] uri_folder (folder only)
Closes #12625 from ArianaVillegas/ARROW-15587
Lead-authored-by: Ariana Villegas <ar...@utec.edu.pe>
Co-authored-by: ArianaVillegas <40...@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
.../arrow/engine/substrait/relation_internal.cc | 77 ++++++++++++++++------
cpp/src/arrow/engine/substrait/serde_test.cc | 4 +-
cpp/src/arrow/filesystem/filesystem_test.cc | 65 ++++++++++++++++++
cpp/src/arrow/filesystem/path_util.cc | 40 +++++++++++
cpp/src/arrow/filesystem/path_util.h | 11 ++++
cpp/src/arrow/filesystem/util_internal.cc | 60 +++++++++++++++++
cpp/src/arrow/filesystem/util_internal.h | 7 ++
7 files changed, 243 insertions(+), 21 deletions(-)
diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc
index 4e4a4f5126..723edfe2ec 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.cc
+++ b/cpp/src/arrow/engine/substrait/relation_internal.cc
@@ -26,6 +26,7 @@
#include "arrow/engine/substrait/expression_internal.h"
#include "arrow/engine/substrait/type_internal.h"
#include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/util_internal.h"
namespace arrow {
namespace engine {
@@ -88,22 +89,29 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
std::shared_ptr<dataset::FileFormat> format;
auto filesystem = std::make_shared<fs::LocalFileSystem>();
- std::vector<std::shared_ptr<dataset::FileFragment>> fragments;
+ std::vector<fs::FileInfo> files;
for (const auto& item : read.local_files().items()) {
- if (item.path_type_case() !=
- substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
- return Status::NotImplemented(
- "substrait::ReadRel::LocalFiles::FileOrFiles with "
- "path_type other than uri_file");
+ std::string path;
+ if (item.path_type_case() ==
+ substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
+ path = item.uri_path();
+ } else if (item.path_type_case() ==
+ substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
+ path = item.uri_file();
+ } else if (item.path_type_case() ==
+ substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
+ path = item.uri_folder();
+ } else {
+ path = item.uri_path_glob();
}
if (item.format() ==
substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET) {
format = std::make_shared<dataset::ParquetFileFormat>();
- } else if (util::string_view{item.uri_file()}.ends_with(".arrow")) {
+ } else if (util::string_view{path}.ends_with(".arrow")) {
format = std::make_shared<dataset::IpcFileFormat>();
- } else if (util::string_view{item.uri_file()}.ends_with(".feather")) {
+ } else if (util::string_view{path}.ends_with(".feather")) {
format = std::make_shared<dataset::IpcFileFormat>();
} else {
return Status::NotImplemented(
@@ -111,12 +119,11 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
"other than FILE_FORMAT_PARQUET");
}
- if (!util::string_view{item.uri_file()}.starts_with("file:///")) {
- return Status::NotImplemented(
- "substrait::ReadRel::LocalFiles::FileOrFiles::uri_file "
- "with other than local filesystem (file:///)");
+ if (!util::string_view{path}.starts_with("file:///")) {
+ return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", path,
+ ") with other than local filesystem "
+ "(file:///)");
}
- auto path = item.uri_file().substr(7);
if (item.partition_index() != 0) {
return Status::NotImplemented(
@@ -133,15 +140,45 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
"non-default substrait::ReadRel::LocalFiles::FileOrFiles::length");
}
- ARROW_ASSIGN_OR_RAISE(auto fragment, format->MakeFragment(dataset::FileSource{
- std::move(path), filesystem}));
- fragments.push_back(std::move(fragment));
+ path = path.substr(7);
+ if (item.path_type_case() ==
+ substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
+ ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
+ if (file.type() == fs::FileType::File) {
+ files.push_back(std::move(file));
+ } else if (file.type() == fs::FileType::Directory) {
+ fs::FileSelector selector;
+ selector.base_dir = path;
+ selector.recursive = true;
+ ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+ filesystem->GetFileInfo(selector));
+ std::move(files.begin(), files.end(), std::back_inserter(discovered_files));
+ }
+ }
+ if (item.path_type_case() ==
+ substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
+ files.emplace_back(path, fs::FileType::File);
+ } else if (item.path_type_case() ==
+ substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
+ fs::FileSelector selector;
+ selector.base_dir = path;
+ selector.recursive = true;
+ ARROW_ASSIGN_OR_RAISE(auto discovered_files, filesystem->GetFileInfo(selector));
+ std::move(discovered_files.begin(), discovered_files.end(),
+ std::back_inserter(files));
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+ fs::internal::GlobFiles(filesystem, path));
+ std::move(discovered_files.begin(), discovered_files.end(),
+ std::back_inserter(files));
+ }
}
- ARROW_ASSIGN_OR_RAISE(
- auto ds, dataset::FileSystemDataset::Make(
- std::move(base_schema), /*root_partition=*/compute::literal(true),
- std::move(format), std::move(filesystem), std::move(fragments)));
+ ARROW_ASSIGN_OR_RAISE(auto ds_factory, dataset::FileSystemDatasetFactory::Make(
+ std::move(filesystem), std::move(files),
+ std::move(format), {}));
+
+ ARROW_ASSIGN_OR_RAISE(auto ds, ds_factory->Finish(std::move(base_schema)));
return compute::Declaration{
"scan", dataset::ScanNodeOptions{std::move(ds), std::move(scan_options)}};
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index 300a6c528b..9c6f17d1e6 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -33,6 +33,7 @@
using testing::ElementsAre;
using testing::Eq;
using testing::HasSubstr;
+using testing::UnorderedElementsAre;
namespace arrow {
@@ -654,7 +655,8 @@ TEST(Substrait, ReadRel) {
ASSERT_EQ(scan_node_options.dataset->type_name(), "filesystem");
const auto& dataset =
checked_cast<const dataset::FileSystemDataset&>(*scan_node_options.dataset);
- EXPECT_THAT(dataset.files(), ElementsAre("/tmp/dat1.parquet", "/tmp/dat2.parquet"));
+ EXPECT_THAT(dataset.files(),
+ UnorderedElementsAre("/tmp/dat1.parquet", "/tmp/dat2.parquet"));
EXPECT_EQ(dataset.format()->type_name(), "parquet");
EXPECT_EQ(*dataset.schema(), Schema({field("i", int64()), field("b", boolean())}));
}
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc
index 44889356b1..bfe1282245 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -26,6 +26,7 @@
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
+#include "arrow/filesystem/util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/key_value_metadata.h"
@@ -265,6 +266,70 @@ TEST(PathUtil, ToSlashes) {
#endif
}
+TEST(PathUtil, Globber) {
+ Globber empty("");
+ ASSERT_FALSE(empty.Matches("/1.txt"));
+
+ Globber star("/*");
+ ASSERT_TRUE(star.Matches("/a.txt"));
+ ASSERT_TRUE(star.Matches("/b.csv"));
+ ASSERT_FALSE(star.Matches("/foo/c.parquet"));
+
+ Globber question("/a?b");
+ ASSERT_TRUE(question.Matches("/acb"));
+ ASSERT_FALSE(question.Matches("/a/b"));
+
+ Globber localfs_linux("/f?o/bar/a?/1*.txt");
+ ASSERT_TRUE(localfs_linux.Matches("/foo/bar/a1/1.txt"));
+ ASSERT_TRUE(localfs_linux.Matches("/f#o/bar/ab/1000.txt"));
+ ASSERT_FALSE(localfs_linux.Matches("/f#o/bar/ab/1/23.txt"));
+
+ Globber localfs_windows("C:/f?o/bar/a?/1*.txt");
+ ASSERT_TRUE(localfs_windows.Matches("C:/f_o/bar/ac/1000.txt"));
+
+ Globber remotefs("/my|bucket(#?)/foo{*}/[?]bar~/b&z/a: *-c.txt");
+ ASSERT_TRUE(remotefs.Matches("/my|bucket(#0)/foo{}/[?]bar~/b&z/a: -c.txt"));
+ ASSERT_TRUE(remotefs.Matches("/my|bucket(#%)/foo{abc}/[_]bar~/b&z/a: ab-c.txt"));
+
+ Globber wildcards("/bucket?/f\\?o/\\*/*.parquet");
+ ASSERT_TRUE(wildcards.Matches("/bucket0/f?o/*/abc.parquet"));
+ ASSERT_FALSE(wildcards.Matches("/bucket0/foo/ab/a.parquet"));
+}
+
+TEST(InternalUtil, GlobFiles) {
+ auto fs = std::make_shared<MockFileSystem>(TimePoint{});
+
+ auto check_entries = [](const std::vector<FileInfo>& infos,
+ std::vector<std::string> expected) -> void {
+ std::vector<std::string> actual(infos.size());
+ std::transform(infos.begin(), infos.end(), actual.begin(),
+ [](const FileInfo& file) { return file.path(); });
+ std::sort(actual.begin(), actual.end());
+ ASSERT_EQ(actual, expected);
+ };
+
+ ASSERT_OK(fs->CreateDir("A/CD"));
+ ASSERT_OK(fs->CreateDir("AB/CD"));
+ ASSERT_OK(fs->CreateDir("AB/CD/ab"));
+ CreateFile(fs.get(), "A/CD/ab.txt", "data");
+ CreateFile(fs.get(), "AB/CD/a.txt", "data");
+ CreateFile(fs.get(), "AB/CD/abc.txt", "data");
+ CreateFile(fs.get(), "AB/CD/ab/c.txt", "data");
+
+ FileInfoVector infos;
+ ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "A*/CD/?b*.txt"));
+ ASSERT_EQ(infos.size(), 2);
+ check_entries(infos, {"A/CD/ab.txt", "AB/CD/abc.txt"});
+
+ // Leading slash is optional but doesn't change behavior
+ ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "/A*/CD/?b*.txt"));
+ ASSERT_EQ(infos.size(), 2);
+ check_entries(infos, {"A/CD/ab.txt", "AB/CD/abc.txt"});
+
+ ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "A*/CD/?/b*.txt"));
+ ASSERT_EQ(infos.size(), 0);
+}
+
////////////////////////////////////////////////////////////////////////////
// Generic MockFileSystem tests
diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc
index 2983e68fc9..83ff069ecf 100644
--- a/cpp/src/arrow/filesystem/path_util.cc
+++ b/cpp/src/arrow/filesystem/path_util.cc
@@ -16,6 +16,7 @@
// under the License.
#include <algorithm>
+#include <regex>
#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
@@ -287,6 +288,45 @@ bool IsLikelyUri(util::string_view v) {
return ::arrow::internal::IsValidUriScheme(v.substr(0, pos));
}
+struct Globber::Impl {
+ std::regex pattern_;
+
+ explicit Impl(const std::string& p) : pattern_(std::regex(PatternToRegex(p))) {}
+
+ static std::string PatternToRegex(const std::string& p) {
+ std::string special_chars = "()[]{}+-|^$\\.&~# \t\n\r\v\f";
+ std::string transformed;
+ auto it = p.begin();
+ while (it != p.end()) {
+ if (*it == '\\') {
+ transformed += '\\';
+ if (++it != p.end()) {
+ transformed += *it;
+ }
+ } else if (*it == '*') {
+ transformed += "[^/]*";
+ } else if (*it == '?') {
+ transformed += "[^/]";
+ } else if (special_chars.find(*it) != std::string::npos) {
+ transformed += "\\";
+ transformed += *it;
+ } else {
+ transformed += *it;
+ }
+ it++;
+ }
+ return transformed;
+ }
+};
+
+Globber::Globber(std::string pattern) : impl_(new Impl(pattern)) {}
+
+Globber::~Globber() {}
+
+bool Globber::Matches(const std::string& path) {
+ return regex_match(path, impl_->pattern_);
+}
+
} // namespace internal
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/path_util.h b/cpp/src/arrow/filesystem/path_util.h
index 471eec1a90..75745ee44c 100644
--- a/cpp/src/arrow/filesystem/path_util.h
+++ b/cpp/src/arrow/filesystem/path_util.h
@@ -128,6 +128,17 @@ bool IsEmptyPath(util::string_view s);
ARROW_EXPORT
bool IsLikelyUri(util::string_view s);
+class ARROW_EXPORT Globber {
+ public:
+ ~Globber();
+ explicit Globber(std::string pattern);
+ bool Matches(const std::string& path);
+
+ protected:
+ struct Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
} // namespace internal
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc
index bab0ff1202..4851fccd55 100644
--- a/cpp/src/arrow/filesystem/util_internal.cc
+++ b/cpp/src/arrow/filesystem/util_internal.cc
@@ -20,6 +20,7 @@
#include <cerrno>
#include "arrow/buffer.h"
+#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/io_util.h"
@@ -75,6 +76,65 @@ Status InvalidDeleteDirContents(const std::string& path) {
"If you wish to delete the root directory's contents, call DeleteRootDirContents.");
}
+Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
+ const std::string& glob) {
+ // The candidate entries at the current depth level.
+ // We start with the filesystem root.
+ FileInfoVector results{FileInfo("", FileType::Directory)};
+ // The exact tail that will later require matching with candidate entries
+ std::string current_tail;
+
+ auto split_glob = SplitAbstractPath(glob, '/');
+
+ // Process one depth level at once, from root to leaf
+ for (const auto& glob_component : split_glob) {
+ if (glob_component.find_first_of("*?") == std::string::npos) {
+ // If there are no wildcards at the current level, just append
+ // the exact glob path component.
+ current_tail = ConcatAbstractPath(current_tail, glob_component);
+ continue;
+ } else {
+ FileInfoVector children;
+ for (const auto& res : results) {
+ if (res.type() != FileType::Directory) {
+ continue;
+ }
+ FileSelector selector;
+ selector.base_dir = current_tail.empty()
+ ? res.path()
+ : ConcatAbstractPath(res.path(), current_tail);
+ ARROW_ASSIGN_OR_RAISE(auto entries, filesystem->GetFileInfo(selector));
+ Globber globber(ConcatAbstractPath(selector.base_dir, glob_component));
+ for (auto&& entry : entries) {
+ if (globber.Matches(entry.path())) {
+ children.push_back(std::move(entry));
+ }
+ }
+ }
+ results = std::move(children);
+ current_tail.clear();
+ }
+ }
+
+ if (!current_tail.empty()) {
+ std::vector<std::string> paths;
+ paths.reserve(results.size());
+ for (const auto& file : results) {
+ paths.push_back(ConcatAbstractPath(file.path(), current_tail));
+ }
+ ARROW_ASSIGN_OR_RAISE(results, filesystem->GetFileInfo(paths));
+ }
+
+ std::vector<FileInfo> out;
+ for (auto&& file : results) {
+ if (file.type() != FileType::NotFound) {
+ out.push_back(std::move(file));
+ }
+ }
+
+ return out;
+}
+
FileSystemGlobalOptions global_options;
} // namespace internal
diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h
index 915c8d03d4..80a29feb85 100644
--- a/cpp/src/arrow/filesystem/util_internal.h
+++ b/cpp/src/arrow/filesystem/util_internal.h
@@ -49,6 +49,13 @@ Status NotAFile(const std::string& path);
ARROW_EXPORT
Status InvalidDeleteDirContents(const std::string& path);
+/// \brief Return files matching the glob pattern on the filesystem
+///
+/// Globbing starts from the root of the filesystem.
+ARROW_EXPORT
+Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
+ const std::string& glob);
+
extern FileSystemGlobalOptions global_options;
} // namespace internal