You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/07 19:48:11 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #8818: ARROW-10788: [C++] Make S3 recursive tree walks parallel

bkietz commented on a change in pull request #8818:
URL: https://github.com/apache/arrow/pull/8818#discussion_r537767675



##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1080,6 +1082,134 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
 }
 
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+  using ResultHandler = std::function<Status(const std::string& prefix,
+                                             const S3Model::ListObjectsV2Result&)>;
+  using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+  using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+  Aws::S3::S3Client* client_;
+  const std::string bucket_;
+  const std::string base_dir_;
+  const int32_t max_keys_;
+  const ResultHandler result_handler_;
+  const ErrorHandler error_handler_;
+  const RecursionHandler recursion_handler_;
+
+  template <typename... Args>
+  static Status Walk(Args&&... args) {
+    auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+    return self->DoWalk();
+  }
+
+  TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
+             int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
+             RecursionHandler recursion_handler)
+      : client_(std::move(client)),
+        bucket_(std::move(bucket)),
+        base_dir_(std::move(base_dir)),
+        max_keys_(max_keys),
+        result_handler_(std::move(result_handler)),
+        error_handler_(std::move(error_handler)),
+        recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+  std::mutex mutex_;
+  Future<> future_;
+  std::atomic<int32_t> num_in_flight_;
+
+  Status DoWalk() {
+    future_ = decltype(future_)::Make();
+    num_in_flight_ = 0;
+    WalkChild(base_dir_, /*nesting_depth=*/0);
+    // When this returns, ListObjectsV2 tasks either have finished or will exit early
+    return future_.status();
+  }
+
+  bool is_finished() const { return future_.is_finished(); }
+
+  void ListObjectsFinished(Status st) {
+    const auto in_flight = --num_in_flight_;
+    if (!st.ok() || !in_flight) {
+      future_.MarkFinished(std::move(st));
+    }
+  }
+
+  struct ListObjectsV2Handler {
+    std::shared_ptr<TreeWalker> walker;
+    std::string prefix;
+    int32_t nesting_depth;
+    S3Model::ListObjectsV2Request req;
+
+    void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
+                    const S3Model::ListObjectsV2Outcome& outcome,
+                    const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {

Review comment:
       Nit: this would be more readable if you rewrote as 
   ```c++
   Result<bool> DoHandle(... args);
   
   void operator()(... args) {
     std::unique_lock<std::mutex> guard(walker->mutex_);
     auto maybe_truncated = DoHandle(args...);
     if (maybe_truncated.ok() && *maybe_truncated) {
       DCHECK(!result.GetNextContinuationToken().empty());
       req.SetContinuationToken(result.GetNextContinuationToken());
       walker->client_->ListObjectsV2Async(req, *this);
       return;
     }
     walker->ListObjectsFinished(std::move(maybetruncated).status());
   }
   ```

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1242,27 +1372,45 @@ class S3FileSystem::Impl {
   // Recursive workhorse for GetTargetStats(FileSelector...)
   Status Walk(const FileSelector& select, const std::string& bucket,
               const std::string& key, std::vector<FileInfo>* out) {
-    int32_t nesting_depth = 0;
-    return Walk(select, bucket, key, nesting_depth, out);
-  }
+    bool is_empty = true;
 
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, int32_t nesting_depth, std::vector<FileInfo>* out) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
-    }
+    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
+      if (select.allow_not_found && IsNotFound(error)) {
+        return Status::OK();
+      }
+      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+                                                 "' in bucket '", bucket, "': "),
+                           error);
+    };
 
-    bool is_empty = true;
-    std::vector<std::string> child_keys;
+    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+      if (nesting_depth >= kMaxNestingDepth) {
+        return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
+                               kMaxNestingDepth, ")");
+      }

Review comment:
       since this is common to any recursive walk, should it be inlined in ListObjectsV2Handler::DoHandle or so?

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1080,6 +1082,134 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
 }
 
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+  using ResultHandler = std::function<Status(const std::string& prefix,
+                                             const S3Model::ListObjectsV2Result&)>;
+  using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+  using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+  Aws::S3::S3Client* client_;
+  const std::string bucket_;
+  const std::string base_dir_;
+  const int32_t max_keys_;
+  const ResultHandler result_handler_;
+  const ErrorHandler error_handler_;
+  const RecursionHandler recursion_handler_;
+
+  template <typename... Args>
+  static Status Walk(Args&&... args) {
+    auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+    return self->DoWalk();
+  }
+
+  TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
+             int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
+             RecursionHandler recursion_handler)
+      : client_(std::move(client)),
+        bucket_(std::move(bucket)),
+        base_dir_(std::move(base_dir)),
+        max_keys_(max_keys),
+        result_handler_(std::move(result_handler)),
+        error_handler_(std::move(error_handler)),
+        recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+  std::mutex mutex_;
+  Future<> future_;
+  std::atomic<int32_t> num_in_flight_;
+
+  Status DoWalk() {
+    future_ = decltype(future_)::Make();
+    num_in_flight_ = 0;
+    WalkChild(base_dir_, /*nesting_depth=*/0);
+    // When this returns, ListObjectsV2 tasks either have finished or will exit early
+    return future_.status();
+  }
+
+  bool is_finished() const { return future_.is_finished(); }
+
+  void ListObjectsFinished(Status st) {
+    const auto in_flight = --num_in_flight_;
+    if (!st.ok() || !in_flight) {
+      future_.MarkFinished(std::move(st));
+    }
+  }
+
+  struct ListObjectsV2Handler {
+    std::shared_ptr<TreeWalker> walker;
+    std::string prefix;
+    int32_t nesting_depth;
+    S3Model::ListObjectsV2Request req;
+
+    void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
+                    const S3Model::ListObjectsV2Outcome& outcome,
+                    const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
+      // Serialize calls to operation-specific handlers
+      std::unique_lock<std::mutex> guard(walker->mutex_);
+      if (walker->is_finished()) {
+        // Early exit: avoid executing handlers if DoWalk() returned
+        return;
+      }
+      if (!outcome.IsSuccess()) {
+        Status st = walker->error_handler_(outcome.GetError());
+        walker->ListObjectsFinished(std::move(st));
+        return;
+      }
+      const auto& result = outcome.GetResult();
+      bool recurse = result.GetCommonPrefixes().size() > 0;
+      if (recurse) {
+        auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
+        if (!maybe_recurse.ok()) {
+          walker->ListObjectsFinished(maybe_recurse.status());
+          return;
+        }
+        recurse &= *maybe_recurse;
+      }
+      Status st = walker->result_handler_(prefix, result);
+      if (!st.ok()) {
+        walker->ListObjectsFinished(std::move(st));
+        return;
+      }
+      if (recurse) {
+        walker->WalkChildren(result, nesting_depth + 1);
+      }
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        req.SetContinuationToken(result.GetNextContinuationToken());
+        walker->client_->ListObjectsV2Async(req, *this);
+      } else {
+        walker->ListObjectsFinished(Status::OK());
+      }
+    }
+
+    void Start() {
+      req.SetBucket(ToAwsString(walker->bucket_));
+      if (!prefix.empty()) {
+        req.SetPrefix(ToAwsString(prefix) + kSep);
+      }
+      req.SetDelimiter(Aws::String() + kSep);
+      req.SetMaxKeys(walker->max_keys_);
+      walker->client_->ListObjectsV2Async(req, *this);
+    }
+  };
+
+  void WalkChild(std::string key, int32_t nesting_depth) {
+    ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
+    ++num_in_flight_;
+    handler.Start();

Review comment:
       Nit: having a Start method on a handler is counter intuitive. I'd prefer it be inlined here and `req` not be a member. (It's referenced in `ListObjectsV2Handler::operator()` but it still doesn't need to be a member since the same ListObjectsV2Request is passed as an argument)

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1317,36 +1434,24 @@ class S3FileSystem::Impl {
         return PathNotFound(bucket, key);
       }
     }
+    // TODO sort results for convenience?

Review comment:
       I think we probably should, if only to establish consistency for testing

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1379,23 +1520,16 @@ class S3FileSystem::Impl {
       }
       req.SetBucket(ToAwsString(bucket));
       req.SetDelete(std::move(del));
-      auto outcome = client_->DeleteObjects(req);
-      if (!outcome.IsSuccess()) {
-        return ErrorToStatus(outcome.GetError());
-      }
-      // Also need to check per-key errors, even on successful outcome
-      // See
-      // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
-      const auto& errors = outcome.GetResult().GetErrors();
-      if (!errors.empty()) {
-        std::stringstream ss;
-        ss << "Got the following " << errors.size()
-           << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
-        for (const auto& error : errors) {
-          ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
-        }
-        return Status::IOError(ss.str());
-      }
+      deleters.emplace_back();
+      client_->DeleteObjectsAsync(req, deleters.back());
+    }
+
+    std::vector<Future<>*> futures(deleters.size());
+    std::transform(deleters.begin(), deleters.end(), futures.begin(),
+                   [](Deleter& del) { return &del.future; });

Review comment:
       ```suggestion
         futures.push_back(&deleters.back().future);
       }
   ```

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1357,20 +1462,56 @@ class S3FileSystem::Impl {
                            error);
     };
 
-    RETURN_NOT_OK(
-        ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
+    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+      if (nesting_depth >= kMaxNestingDepth) {
+        return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
+                               kMaxNestingDepth, ")");
+      }
+      return true;  // Recurse
+    };
 
-    // Recurse
-    for (const auto& child_key : child_keys) {
-      RETURN_NOT_OK(
-          WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys, dir_keys));
-    }
-    return Status::OK();
+    return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+                            handle_results, handle_error, handle_recursion);
   }
 
   // Delete multiple objects at once
   Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
+    struct Deleter {
+      Future<> future;
+
+      Deleter() : future(Future<>::Make()) {}

Review comment:
       ```suggestion
       struct DeleteHandler {
         Future<> future = Future<>::Make();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org