You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/12/08 12:19:10 UTC

[arrow] branch master updated: ARROW-10788: [C++] Make S3 recursive tree walks parallel

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b8ac0  ARROW-10788: [C++] Make S3 recursive tree walks parallel
d1b8ac0 is described below

commit d1b8ac07ef5b6f729e147458930e3e6196602dca
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Dec 8 13:18:14 2020 +0100

    ARROW-10788: [C++] Make S3 recursive tree walks parallel
    
    Use the AWS SDK async APIs to launch child directory reads concurrently as soon
    as we get the required information from a parent read.
    
    Also, similarly issue directory tree deletion commands in parallel.
    
    On this machine, listing the entire directory tree at "s3://mf-nwp-models/arome-france/v2/2020-12-02"
    goes down from 12 seconds to 2 seconds (a 6x speedup, for 12944 directory entries).
    
    Closes #8818 from pitrou/ARROW-10788-s3-walk-async
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/filesystem/s3fs.cc                | 342 ++++++++++++++++--------
 cpp/src/arrow/filesystem/s3fs_narrative_test.cc |   2 +
 cpp/src/arrow/util/future.h                     |   9 +
 3 files changed, 237 insertions(+), 116 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 0218faf..1f6da9e 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
+#include <functional>
 #include <mutex>
 #include <sstream>
 #include <unordered_map>
@@ -75,6 +76,7 @@
 #include "arrow/status.h"
 #include "arrow/util/atomic_shared_ptr.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/windows_fixup.h"
 
@@ -1080,6 +1082,139 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
 }
 
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+  using ResultHandler = std::function<Status(const std::string& prefix,
+                                             const S3Model::ListObjectsV2Result&)>;
+  using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+  using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+  Aws::S3::S3Client* client_;
+  const std::string bucket_;
+  const std::string base_dir_;
+  const int32_t max_keys_;
+  const ResultHandler result_handler_;
+  const ErrorHandler error_handler_;
+  const RecursionHandler recursion_handler_;
+
+  template <typename... Args>
+  static Status Walk(Args&&... args) {
+    auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+    return self->DoWalk();
+  }
+
+  TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
+             int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
+             RecursionHandler recursion_handler)
+      : client_(std::move(client)),
+        bucket_(std::move(bucket)),
+        base_dir_(std::move(base_dir)),
+        max_keys_(max_keys),
+        result_handler_(std::move(result_handler)),
+        error_handler_(std::move(error_handler)),
+        recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+  std::mutex mutex_;
+  Future<> future_;
+  std::atomic<int32_t> num_in_flight_;
+
+  Status DoWalk() {
+    future_ = decltype(future_)::Make();
+    num_in_flight_ = 0;
+    WalkChild(base_dir_, /*nesting_depth=*/0);
+    // When this returns, ListObjectsV2 tasks either have finished or will exit early
+    return future_.status();
+  }
+
+  bool is_finished() const { return future_.is_finished(); }
+
+  void ListObjectsFinished(Status st) {
+    const auto in_flight = --num_in_flight_;
+    if (!st.ok() || !in_flight) {
+      future_.MarkFinished(std::move(st));
+    }
+  }
+
+  struct ListObjectsV2Handler {
+    std::shared_ptr<TreeWalker> walker;
+    std::string prefix;
+    int32_t nesting_depth;
+    S3Model::ListObjectsV2Request req;
+
+    void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
+                    const S3Model::ListObjectsV2Outcome& outcome,
+                    const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
+      // Serialize calls to operation-specific handlers
+      std::unique_lock<std::mutex> guard(walker->mutex_);
+      if (walker->is_finished()) {
+        // Early exit: avoid executing handlers if DoWalk() returned
+        return;
+      }
+      if (!outcome.IsSuccess()) {
+        Status st = walker->error_handler_(outcome.GetError());
+        walker->ListObjectsFinished(std::move(st));
+        return;
+      }
+      HandleResult(outcome.GetResult());
+    }
+
+    void HandleResult(const S3Model::ListObjectsV2Result& result) {
+      bool recurse = result.GetCommonPrefixes().size() > 0;
+      if (recurse) {
+        auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
+        if (!maybe_recurse.ok()) {
+          walker->ListObjectsFinished(maybe_recurse.status());
+          return;
+        }
+        recurse &= *maybe_recurse;
+      }
+      Status st = walker->result_handler_(prefix, result);
+      if (!st.ok()) {
+        walker->ListObjectsFinished(std::move(st));
+        return;
+      }
+      if (recurse) {
+        walker->WalkChildren(result, nesting_depth + 1);
+      }
+      // If the result was truncated, issue a continuation request to get
+      // further directory entries.
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        req.SetContinuationToken(result.GetNextContinuationToken());
+        walker->client_->ListObjectsV2Async(req, *this);
+      } else {
+        walker->ListObjectsFinished(Status::OK());
+      }
+    }
+
+    void Start() {
+      req.SetBucket(ToAwsString(walker->bucket_));
+      if (!prefix.empty()) {
+        req.SetPrefix(ToAwsString(prefix) + kSep);
+      }
+      req.SetDelimiter(Aws::String() + kSep);
+      req.SetMaxKeys(walker->max_keys_);
+      walker->client_->ListObjectsV2Async(req, *this);
+    }
+  };
+
+  void WalkChild(std::string key, int32_t nesting_depth) {
+    ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
+    ++num_in_flight_;
+    handler.Start();
+  }
+
+  void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
+    for (const auto& prefix : result.GetCommonPrefixes()) {
+      const auto child_key =
+          internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
+      WalkChild(std::string{child_key}, nesting_depth);
+    }
+  }
+
+  friend struct ListObjectsV2Handler;
+};
+
 }  // namespace
 
 // -----------------------------------------------------------------------
@@ -1209,60 +1344,53 @@ class S3FileSystem::Impl {
         outcome.GetError());
   }
 
-  // List objects under a given prefix, issuing continuation requests if necessary
-  template <typename ResultCallable, typename ErrorCallable>
-  Status ListObjectsV2(const std::string& bucket, const std::string& prefix,
-                       ResultCallable&& result_callable, ErrorCallable&& error_callable) {
-    S3Model::ListObjectsV2Request req;
-    req.SetBucket(ToAwsString(bucket));
-    if (!prefix.empty()) {
-      req.SetPrefix(ToAwsString(prefix) + kSep);
-    }
-    req.SetDelimiter(Aws::String() + kSep);
-    req.SetMaxKeys(kListObjectsMaxKeys);
-
-    while (true) {
-      auto outcome = client_->ListObjectsV2(req);
-      if (!outcome.IsSuccess()) {
-        return error_callable(outcome.GetError());
-      }
-      const auto& result = outcome.GetResult();
-      RETURN_NOT_OK(result_callable(result));
-      // Was the result limited by max-keys? If so, use the continuation token
-      // to fetch further results.
-      if (!result.GetIsTruncated()) {
-        break;
-      }
-      DCHECK(!result.GetNextContinuationToken().empty());
-      req.SetContinuationToken(result.GetNextContinuationToken());
+  Status CheckNestingDepth(int32_t nesting_depth) {
+    if (nesting_depth >= kMaxNestingDepth) {
+      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
+                             kMaxNestingDepth, ")");
     }
     return Status::OK();
   }
 
-  // Recursive workhorse for GetTargetStats(FileSelector...)
+  // Workhorse for GetTargetStats(FileSelector...)
   Status Walk(const FileSelector& select, const std::string& bucket,
               const std::string& key, std::vector<FileInfo>* out) {
-    int32_t nesting_depth = 0;
-    return Walk(select, bucket, key, nesting_depth, out);
-  }
+    bool is_empty = true;
 
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, int32_t nesting_depth, std::vector<FileInfo>* out) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
-    }
+    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
+      if (select.allow_not_found && IsNotFound(error)) {
+        return Status::OK();
+      }
+      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+                                                 "' in bucket '", bucket, "': "),
+                           error);
+    };
 
-    bool is_empty = true;
-    std::vector<std::string> child_keys;
+    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
+      return select.recursive && nesting_depth <= select.max_recursion;
+    };
 
-    auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
+    auto handle_results = [&](const std::string& prefix,
+                              const S3Model::ListObjectsV2Result& result) -> Status {
+      // Walk "directories"
+      for (const auto& prefix : result.GetCommonPrefixes()) {
+        is_empty = false;
+        const auto child_key =
+            internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
+        std::stringstream child_path;
+        child_path << bucket << kSep << child_key;
+        FileInfo info;
+        info.set_path(child_path.str());
+        info.set_type(FileType::Directory);
+        out->push_back(std::move(info));
+      }
       // Walk "files"
       for (const auto& obj : result.GetContents()) {
         is_empty = false;
         FileInfo info;
         const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
-        if (child_key == util::string_view(key)) {
+        if (child_key == util::string_view(prefix)) {
           // Amazon can return the "directory" key itself as part of the results, skip
           continue;
         }
@@ -1272,42 +1400,11 @@ class S3FileSystem::Impl {
         FileObjectToInfo(obj, &info);
         out->push_back(std::move(info));
       }
-      // Walk "directories"
-      for (const auto& prefix : result.GetCommonPrefixes()) {
-        is_empty = false;
-        const auto child_key =
-            internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
-        std::stringstream ss;
-        ss << bucket << kSep << child_key;
-        FileInfo info;
-        info.set_path(ss.str());
-        info.set_type(FileType::Directory);
-        out->push_back(std::move(info));
-        if (select.recursive) {
-          child_keys.emplace_back(child_key);
-        }
-      }
       return Status::OK();
     };
 
-    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
-      }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
-                                                 "' in bucket '", bucket, "': "),
-                           error);
-    };
-
-    RETURN_NOT_OK(
-        ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
-
-    // Recurse
-    if (select.recursive && nesting_depth < select.max_recursion) {
-      for (const auto& child_key : child_keys) {
-        RETURN_NOT_OK(Walk(select, bucket, child_key, nesting_depth + 1, out));
-      }
-    }
+    RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+                                   handle_results, handle_error, handle_recursion));
 
     // If no contents were found, perhaps it's an empty "directory",
     // or perhaps it's a nonexistent entry.  Check.
@@ -1317,36 +1414,25 @@ class S3FileSystem::Impl {
         return PathNotFound(bucket, key);
       }
     }
+    // Sort results for convenience, since they can come massively out of order
+    std::sort(out->begin(), out->end(), FileInfo::ByPath{});
     return Status::OK();
   }
 
   Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
                           std::vector<std::string>* file_keys,
                           std::vector<std::string>* dir_keys) {
-    int32_t nesting_depth = 0;
-    return WalkForDeleteDir(bucket, key, nesting_depth, file_keys, dir_keys);
-  }
-
-  Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
-                          int32_t nesting_depth, std::vector<std::string>* file_keys,
-                          std::vector<std::string>* dir_keys) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
-    }
-
-    std::vector<std::string> child_keys;
-
-    auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
+    auto handle_results = [&](const std::string& prefix,
+                              const S3Model::ListObjectsV2Result& result) -> Status {
       // Walk "files"
+      file_keys->reserve(file_keys->size() + result.GetContents().size());
       for (const auto& obj : result.GetContents()) {
         file_keys->emplace_back(FromAwsString(obj.GetKey()));
       }
       // Walk "directories"
+      dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
       for (const auto& prefix : result.GetCommonPrefixes()) {
-        auto child_key = FromAwsString(prefix.GetPrefix());
-        dir_keys->emplace_back(child_key);
-        child_keys.emplace_back(internal::RemoveTrailingSlash(child_key));
+        dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
       }
       return Status::OK();
     };
@@ -1357,20 +1443,53 @@ class S3FileSystem::Impl {
                            error);
     };
 
-    RETURN_NOT_OK(
-        ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
+    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
+      return true;  // Recurse
+    };
 
-    // Recurse
-    for (const auto& child_key : child_keys) {
-      RETURN_NOT_OK(
-          WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys, dir_keys));
-    }
-    return Status::OK();
+    return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+                            handle_results, handle_error, handle_recursion);
   }
 
   // Delete multiple objects at once
   Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
+    struct DeleteHandler {
+      Future<> future = Future<>::Make();
+
+      // Callback for DeleteObjectsAsync
+      void operator()(const Aws::S3::S3Client*, const S3Model::DeleteObjectsRequest& req,
+                      const S3Model::DeleteObjectsOutcome& outcome,
+                      const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
+        if (!outcome.IsSuccess()) {
+          future.MarkFinished(ErrorToStatus(outcome.GetError()));
+          return;
+        }
+        // Also need to check per-key errors, even on successful outcome
+        // See
+        // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
+        const auto& errors = outcome.GetResult().GetErrors();
+        if (!errors.empty()) {
+          std::stringstream ss;
+          ss << "Got the following " << errors.size()
+             << " errors when deleting objects in S3 bucket '" << req.GetBucket()
+             << "':\n";
+          for (const auto& error : errors) {
+            ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
+          }
+          future.MarkFinished(Status::IOError(ss.str()));
+        } else {
+          future.MarkFinished();
+        }
+      }
+    };
+
     const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
+    std::vector<DeleteHandler> delete_handlers;
+    std::vector<Future<>*> futures;
+    delete_handlers.reserve(keys.size() / chunk_size + 1);
+    futures.reserve(delete_handlers.capacity());
+
     for (size_t start = 0; start < keys.size(); start += chunk_size) {
       S3Model::DeleteObjectsRequest req;
       S3Model::Delete del;
@@ -1379,23 +1498,14 @@ class S3FileSystem::Impl {
       }
       req.SetBucket(ToAwsString(bucket));
       req.SetDelete(std::move(del));
-      auto outcome = client_->DeleteObjects(req);
-      if (!outcome.IsSuccess()) {
-        return ErrorToStatus(outcome.GetError());
-      }
-      // Also need to check per-key errors, even on successful outcome
-      // See
-      // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
-      const auto& errors = outcome.GetResult().GetErrors();
-      if (!errors.empty()) {
-        std::stringstream ss;
-        ss << "Got the following " << errors.size()
-           << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
-        for (const auto& error : errors) {
-          ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
-        }
-        return Status::IOError(ss.str());
-      }
+      delete_handlers.emplace_back();
+      futures.push_back(&delete_handlers.back().future);
+      client_->DeleteObjectsAsync(req, delete_handlers.back());
+    }
+
+    WaitForAll(futures);
+    for (const auto* fut : futures) {
+      RETURN_NOT_OK(fut->status());
     }
     return Status::OK();
   }
diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
index 0beb51d..f65ccba 100644
--- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
@@ -138,6 +138,7 @@ void TestBucket(int argc, char** argv) {
   select.allow_not_found = false;
   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
   ASSERT_EQ(infos.size(), 2);
+  SortInfos(&infos);
   AssertFileInfo(infos[0], "Dir1/File2", FileType::File, 11);
   AssertFileInfo(infos[1], "Dir1/Subdir", FileType::Directory);
 
@@ -145,6 +146,7 @@ void TestBucket(int argc, char** argv) {
   select.recursive = true;
   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
   ASSERT_EQ(infos.size(), 2);
+  SortInfos(&infos);
   AssertFileInfo(infos[0], "Dir2/Subdir", FileType::Directory);
   AssertFileInfo(infos[1], "Dir2/Subdir/File3", FileType::File, 10);
 
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index ab509e5..1d2aedf 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -187,6 +187,15 @@ class Future {
     return impl_->state();
   }
 
+  /// \brief Whether the Future is finished
+  ///
+  /// A false return value is only indicative, as the Future can complete
+  /// concurrently.  A true return value is definitive, though.
+  bool is_finished() const {
+    CheckValid();
+    return IsFutureFinished(impl_->state());
+  }
+
   /// \brief Wait for the Future to complete and return its Result
   const Result<ValueType>& result() const& {
     Wait();