You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/05/10 16:11:06 UTC

[arrow] branch master updated: ARROW-15587: [C++] Add support for all options specified by substrait::ReadRel::LocalFiles::FileOrFiles

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dc23c67b7f ARROW-15587: [C++] Add support for all options specified by substrait::ReadRel::LocalFiles::FileOrFiles
dc23c67b7f is described below

commit dc23c67b7f3d15d54eba860650b74f969fb4f06c
Author: Ariana Villegas <ar...@utec.edu.pe>
AuthorDate: Tue May 10 18:10:57 2022 +0200

    ARROW-15587: [C++] Add support for all options specified by substrait::ReadRel::LocalFiles::FileOrFiles
    
    The Substrait read operator defines files with LocalFiles::FileOrFiles. These elements can take one of several forms:
    
    - uri_path (can be a file or a folder)
    - uri_path_glob (a glob expression)
    - uri_file (file only)
    - uri_folder (folder only)
    
    The C++ Substrait consumer currently only supports uri_file. This PR adds support for the other options.
    
    - [x] uri_path (can be a file or a folder)
    - [x] uri_path_glob (a glob expression)
    - [x] uri_folder (folder only)
    
    Closes #12625 from ArianaVillegas/ARROW-15587
    
    Lead-authored-by: Ariana Villegas <ar...@utec.edu.pe>
    Co-authored-by: ArianaVillegas <40...@users.noreply.github.com>
    Co-authored-by: Antoine Pitrou <an...@python.org>
    Co-authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 .../arrow/engine/substrait/relation_internal.cc    | 77 ++++++++++++++++------
 cpp/src/arrow/engine/substrait/serde_test.cc       |  4 +-
 cpp/src/arrow/filesystem/filesystem_test.cc        | 65 ++++++++++++++++++
 cpp/src/arrow/filesystem/path_util.cc              | 40 +++++++++++
 cpp/src/arrow/filesystem/path_util.h               | 11 ++++
 cpp/src/arrow/filesystem/util_internal.cc          | 60 +++++++++++++++++
 cpp/src/arrow/filesystem/util_internal.h           |  7 ++
 7 files changed, 243 insertions(+), 21 deletions(-)

diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc
index 4e4a4f5126..723edfe2ec 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.cc
+++ b/cpp/src/arrow/engine/substrait/relation_internal.cc
@@ -26,6 +26,7 @@
 #include "arrow/engine/substrait/expression_internal.h"
 #include "arrow/engine/substrait/type_internal.h"
 #include "arrow/filesystem/localfs.h"
+#include "arrow/filesystem/util_internal.h"
 
 namespace arrow {
 namespace engine {
@@ -88,22 +89,29 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
 
       std::shared_ptr<dataset::FileFormat> format;
       auto filesystem = std::make_shared<fs::LocalFileSystem>();
-      std::vector<std::shared_ptr<dataset::FileFragment>> fragments;
+      std::vector<fs::FileInfo> files;
 
       for (const auto& item : read.local_files().items()) {
-        if (item.path_type_case() !=
-            substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
-          return Status::NotImplemented(
-              "substrait::ReadRel::LocalFiles::FileOrFiles with "
-              "path_type other than uri_file");
+        std::string path;
+        if (item.path_type_case() ==
+            substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
+          path = item.uri_path();
+        } else if (item.path_type_case() ==
+                   substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
+          path = item.uri_file();
+        } else if (item.path_type_case() ==
+                   substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
+          path = item.uri_folder();
+        } else {
+          path = item.uri_path_glob();
         }
 
         if (item.format() ==
             substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET) {
           format = std::make_shared<dataset::ParquetFileFormat>();
-        } else if (util::string_view{item.uri_file()}.ends_with(".arrow")) {
+        } else if (util::string_view{path}.ends_with(".arrow")) {
           format = std::make_shared<dataset::IpcFileFormat>();
-        } else if (util::string_view{item.uri_file()}.ends_with(".feather")) {
+        } else if (util::string_view{path}.ends_with(".feather")) {
           format = std::make_shared<dataset::IpcFileFormat>();
         } else {
           return Status::NotImplemented(
@@ -111,12 +119,11 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
               "other than FILE_FORMAT_PARQUET");
         }
 
-        if (!util::string_view{item.uri_file()}.starts_with("file:///")) {
-          return Status::NotImplemented(
-              "substrait::ReadRel::LocalFiles::FileOrFiles::uri_file "
-              "with other than local filesystem (file:///)");
+        if (!util::string_view{path}.starts_with("file:///")) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", path,
+                                        ") with other than local filesystem "
+                                        "(file:///)");
         }
-        auto path = item.uri_file().substr(7);
 
         if (item.partition_index() != 0) {
           return Status::NotImplemented(
@@ -133,15 +140,45 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
               "non-default substrait::ReadRel::LocalFiles::FileOrFiles::length");
         }
 
-        ARROW_ASSIGN_OR_RAISE(auto fragment, format->MakeFragment(dataset::FileSource{
-                                                 std::move(path), filesystem}));
-        fragments.push_back(std::move(fragment));
+        path = path.substr(7);
+        if (item.path_type_case() ==
+            substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
+          ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
+          if (file.type() == fs::FileType::File) {
+            files.push_back(std::move(file));
+          } else if (file.type() == fs::FileType::Directory) {
+            fs::FileSelector selector;
+            selector.base_dir = path;
+            selector.recursive = true;
+            ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+                                  filesystem->GetFileInfo(selector));
+            std::move(files.begin(), files.end(), std::back_inserter(discovered_files));
+          }
+        }
+        if (item.path_type_case() ==
+            substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
+          files.emplace_back(path, fs::FileType::File);
+        } else if (item.path_type_case() ==
+                   substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
+          fs::FileSelector selector;
+          selector.base_dir = path;
+          selector.recursive = true;
+          ARROW_ASSIGN_OR_RAISE(auto discovered_files, filesystem->GetFileInfo(selector));
+          std::move(discovered_files.begin(), discovered_files.end(),
+                    std::back_inserter(files));
+        } else {
+          ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+                                fs::internal::GlobFiles(filesystem, path));
+          std::move(discovered_files.begin(), discovered_files.end(),
+                    std::back_inserter(files));
+        }
       }
 
-      ARROW_ASSIGN_OR_RAISE(
-          auto ds, dataset::FileSystemDataset::Make(
-                       std::move(base_schema), /*root_partition=*/compute::literal(true),
-                       std::move(format), std::move(filesystem), std::move(fragments)));
+      ARROW_ASSIGN_OR_RAISE(auto ds_factory, dataset::FileSystemDatasetFactory::Make(
+                                                 std::move(filesystem), std::move(files),
+                                                 std::move(format), {}));
+
+      ARROW_ASSIGN_OR_RAISE(auto ds, ds_factory->Finish(std::move(base_schema)));
 
       return compute::Declaration{
           "scan", dataset::ScanNodeOptions{std::move(ds), std::move(scan_options)}};
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index 300a6c528b..9c6f17d1e6 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -33,6 +33,7 @@
 using testing::ElementsAre;
 using testing::Eq;
 using testing::HasSubstr;
+using testing::UnorderedElementsAre;
 
 namespace arrow {
 
@@ -654,7 +655,8 @@ TEST(Substrait, ReadRel) {
   ASSERT_EQ(scan_node_options.dataset->type_name(), "filesystem");
   const auto& dataset =
       checked_cast<const dataset::FileSystemDataset&>(*scan_node_options.dataset);
-  EXPECT_THAT(dataset.files(), ElementsAre("/tmp/dat1.parquet", "/tmp/dat2.parquet"));
+  EXPECT_THAT(dataset.files(),
+              UnorderedElementsAre("/tmp/dat1.parquet", "/tmp/dat2.parquet"));
   EXPECT_EQ(dataset.format()->type_name(), "parquet");
   EXPECT_EQ(*dataset.schema(), Schema({field("i", int64()), field("b", boolean())}));
 }
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc
index 44889356b1..bfe1282245 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -26,6 +26,7 @@
 #include "arrow/filesystem/mockfs.h"
 #include "arrow/filesystem/path_util.h"
 #include "arrow/filesystem/test_util.h"
+#include "arrow/filesystem/util_internal.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/key_value_metadata.h"
@@ -265,6 +266,70 @@ TEST(PathUtil, ToSlashes) {
 #endif
 }
 
+TEST(PathUtil, Globber) {
+  Globber empty("");
+  ASSERT_FALSE(empty.Matches("/1.txt"));
+
+  Globber star("/*");
+  ASSERT_TRUE(star.Matches("/a.txt"));
+  ASSERT_TRUE(star.Matches("/b.csv"));
+  ASSERT_FALSE(star.Matches("/foo/c.parquet"));
+
+  Globber question("/a?b");
+  ASSERT_TRUE(question.Matches("/acb"));
+  ASSERT_FALSE(question.Matches("/a/b"));
+
+  Globber localfs_linux("/f?o/bar/a?/1*.txt");
+  ASSERT_TRUE(localfs_linux.Matches("/foo/bar/a1/1.txt"));
+  ASSERT_TRUE(localfs_linux.Matches("/f#o/bar/ab/1000.txt"));
+  ASSERT_FALSE(localfs_linux.Matches("/f#o/bar/ab/1/23.txt"));
+
+  Globber localfs_windows("C:/f?o/bar/a?/1*.txt");
+  ASSERT_TRUE(localfs_windows.Matches("C:/f_o/bar/ac/1000.txt"));
+
+  Globber remotefs("/my|bucket(#?)/foo{*}/[?]bar~/b&z/a: *-c.txt");
+  ASSERT_TRUE(remotefs.Matches("/my|bucket(#0)/foo{}/[?]bar~/b&z/a: -c.txt"));
+  ASSERT_TRUE(remotefs.Matches("/my|bucket(#%)/foo{abc}/[_]bar~/b&z/a: ab-c.txt"));
+
+  Globber wildcards("/bucket?/f\\?o/\\*/*.parquet");
+  ASSERT_TRUE(wildcards.Matches("/bucket0/f?o/*/abc.parquet"));
+  ASSERT_FALSE(wildcards.Matches("/bucket0/foo/ab/a.parquet"));
+}
+
+TEST(InternalUtil, GlobFiles) {
+  auto fs = std::make_shared<MockFileSystem>(TimePoint{});
+
+  auto check_entries = [](const std::vector<FileInfo>& infos,
+                          std::vector<std::string> expected) -> void {
+    std::vector<std::string> actual(infos.size());
+    std::transform(infos.begin(), infos.end(), actual.begin(),
+                   [](const FileInfo& file) { return file.path(); });
+    std::sort(actual.begin(), actual.end());
+    ASSERT_EQ(actual, expected);
+  };
+
+  ASSERT_OK(fs->CreateDir("A/CD"));
+  ASSERT_OK(fs->CreateDir("AB/CD"));
+  ASSERT_OK(fs->CreateDir("AB/CD/ab"));
+  CreateFile(fs.get(), "A/CD/ab.txt", "data");
+  CreateFile(fs.get(), "AB/CD/a.txt", "data");
+  CreateFile(fs.get(), "AB/CD/abc.txt", "data");
+  CreateFile(fs.get(), "AB/CD/ab/c.txt", "data");
+
+  FileInfoVector infos;
+  ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "A*/CD/?b*.txt"));
+  ASSERT_EQ(infos.size(), 2);
+  check_entries(infos, {"A/CD/ab.txt", "AB/CD/abc.txt"});
+
+  // Leading slash is optional but doesn't change behavior
+  ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "/A*/CD/?b*.txt"));
+  ASSERT_EQ(infos.size(), 2);
+  check_entries(infos, {"A/CD/ab.txt", "AB/CD/abc.txt"});
+
+  ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "A*/CD/?/b*.txt"));
+  ASSERT_EQ(infos.size(), 0);
+}
+
 ////////////////////////////////////////////////////////////////////////////
 // Generic MockFileSystem tests
 
diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc
index 2983e68fc9..83ff069ecf 100644
--- a/cpp/src/arrow/filesystem/path_util.cc
+++ b/cpp/src/arrow/filesystem/path_util.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <regex>
 
 #include "arrow/filesystem/path_util.h"
 #include "arrow/result.h"
@@ -287,6 +288,45 @@ bool IsLikelyUri(util::string_view v) {
   return ::arrow::internal::IsValidUriScheme(v.substr(0, pos));
 }
 
+struct Globber::Impl {
+  std::regex pattern_;
+
+  explicit Impl(const std::string& p) : pattern_(std::regex(PatternToRegex(p))) {}
+
+  static std::string PatternToRegex(const std::string& p) {
+    std::string special_chars = "()[]{}+-|^$\\.&~# \t\n\r\v\f";
+    std::string transformed;
+    auto it = p.begin();
+    while (it != p.end()) {
+      if (*it == '\\') {
+        transformed += '\\';
+        if (++it != p.end()) {
+          transformed += *it;
+        }
+      } else if (*it == '*') {
+        transformed += "[^/]*";
+      } else if (*it == '?') {
+        transformed += "[^/]";
+      } else if (special_chars.find(*it) != std::string::npos) {
+        transformed += "\\";
+        transformed += *it;
+      } else {
+        transformed += *it;
+      }
+      it++;
+    }
+    return transformed;
+  }
+};
+
+Globber::Globber(std::string pattern) : impl_(new Impl(pattern)) {}
+
+Globber::~Globber() {}
+
+bool Globber::Matches(const std::string& path) {
+  return regex_match(path, impl_->pattern_);
+}
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/path_util.h b/cpp/src/arrow/filesystem/path_util.h
index 471eec1a90..75745ee44c 100644
--- a/cpp/src/arrow/filesystem/path_util.h
+++ b/cpp/src/arrow/filesystem/path_util.h
@@ -128,6 +128,17 @@ bool IsEmptyPath(util::string_view s);
 ARROW_EXPORT
 bool IsLikelyUri(util::string_view s);
 
+class ARROW_EXPORT Globber {
+ public:
+  ~Globber();
+  explicit Globber(std::string pattern);
+  bool Matches(const std::string& path);
+
+ protected:
+  struct Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc
index bab0ff1202..4851fccd55 100644
--- a/cpp/src/arrow/filesystem/util_internal.cc
+++ b/cpp/src/arrow/filesystem/util_internal.cc
@@ -20,6 +20,7 @@
 #include <cerrno>
 
 #include "arrow/buffer.h"
+#include "arrow/filesystem/path_util.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
 #include "arrow/util/io_util.h"
@@ -75,6 +76,65 @@ Status InvalidDeleteDirContents(const std::string& path) {
       "If you wish to delete the root directory's contents, call DeleteRootDirContents.");
 }
 
+Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
+                                 const std::string& glob) {
+  // The candidate entries at the current depth level.
+  // We start with the filesystem root.
+  FileInfoVector results{FileInfo("", FileType::Directory)};
+  // The exact tail that will later require matching with candidate entries
+  std::string current_tail;
+
+  auto split_glob = SplitAbstractPath(glob, '/');
+
+  // Process one depth level at once, from root to leaf
+  for (const auto& glob_component : split_glob) {
+    if (glob_component.find_first_of("*?") == std::string::npos) {
+      // If there are no wildcards at the current level, just append
+      // the exact glob path component.
+      current_tail = ConcatAbstractPath(current_tail, glob_component);
+      continue;
+    } else {
+      FileInfoVector children;
+      for (const auto& res : results) {
+        if (res.type() != FileType::Directory) {
+          continue;
+        }
+        FileSelector selector;
+        selector.base_dir = current_tail.empty()
+                                ? res.path()
+                                : ConcatAbstractPath(res.path(), current_tail);
+        ARROW_ASSIGN_OR_RAISE(auto entries, filesystem->GetFileInfo(selector));
+        Globber globber(ConcatAbstractPath(selector.base_dir, glob_component));
+        for (auto&& entry : entries) {
+          if (globber.Matches(entry.path())) {
+            children.push_back(std::move(entry));
+          }
+        }
+      }
+      results = std::move(children);
+      current_tail.clear();
+    }
+  }
+
+  if (!current_tail.empty()) {
+    std::vector<std::string> paths;
+    paths.reserve(results.size());
+    for (const auto& file : results) {
+      paths.push_back(ConcatAbstractPath(file.path(), current_tail));
+    }
+    ARROW_ASSIGN_OR_RAISE(results, filesystem->GetFileInfo(paths));
+  }
+
+  std::vector<FileInfo> out;
+  for (auto&& file : results) {
+    if (file.type() != FileType::NotFound) {
+      out.push_back(std::move(file));
+    }
+  }
+
+  return out;
+}
+
 FileSystemGlobalOptions global_options;
 
 }  // namespace internal
diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h
index 915c8d03d4..80a29feb85 100644
--- a/cpp/src/arrow/filesystem/util_internal.h
+++ b/cpp/src/arrow/filesystem/util_internal.h
@@ -49,6 +49,13 @@ Status NotAFile(const std::string& path);
 ARROW_EXPORT
 Status InvalidDeleteDirContents(const std::string& path);
 
+/// \brief Return files matching the glob pattern on the filesystem
+///
+/// Globbing starts from the root of the filesystem.
+ARROW_EXPORT
+Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
+                                 const std::string& glob);
+
 extern FileSystemGlobalOptions global_options;
 
 }  // namespace internal