You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/08 09:57:26 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #8818: ARROW-10788: [C++] Make S3 recursive tree walks parallel

pitrou commented on a change in pull request #8818:
URL: https://github.com/apache/arrow/pull/8818#discussion_r538196698



##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1080,6 +1082,134 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
 }
 
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+  using ResultHandler = std::function<Status(const std::string& prefix,
+                                             const S3Model::ListObjectsV2Result&)>;
+  using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+  using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+  Aws::S3::S3Client* client_;
+  const std::string bucket_;
+  const std::string base_dir_;
+  const int32_t max_keys_;
+  const ResultHandler result_handler_;
+  const ErrorHandler error_handler_;
+  const RecursionHandler recursion_handler_;
+
+  template <typename... Args>
+  static Status Walk(Args&&... args) {
+    auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+    return self->DoWalk();
+  }
+
+  TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
+             int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
+             RecursionHandler recursion_handler)
+      : client_(std::move(client)),
+        bucket_(std::move(bucket)),
+        base_dir_(std::move(base_dir)),
+        max_keys_(max_keys),
+        result_handler_(std::move(result_handler)),
+        error_handler_(std::move(error_handler)),
+        recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+  std::mutex mutex_;
+  Future<> future_;
+  std::atomic<int32_t> num_in_flight_;
+
+  Status DoWalk() {
+    future_ = decltype(future_)::Make();
+    num_in_flight_ = 0;
+    WalkChild(base_dir_, /*nesting_depth=*/0);
+    // When this returns, ListObjectsV2 tasks either have finished or will exit early
+    return future_.status();
+  }
+
+  bool is_finished() const { return future_.is_finished(); }
+
+  void ListObjectsFinished(Status st) {
+    const auto in_flight = --num_in_flight_;
+    if (!st.ok() || !in_flight) {
+      future_.MarkFinished(std::move(st));
+    }
+  }
+
+  struct ListObjectsV2Handler {
+    std::shared_ptr<TreeWalker> walker;
+    std::string prefix;
+    int32_t nesting_depth;
+    S3Model::ListObjectsV2Request req;
+
+    void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
+                    const S3Model::ListObjectsV2Outcome& outcome,
+                    const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
+      // Serialize calls to operation-specific handlers
+      std::unique_lock<std::mutex> guard(walker->mutex_);
+      if (walker->is_finished()) {
+        // Early exit: avoid executing handlers if DoWalk() returned
+        return;
+      }
+      if (!outcome.IsSuccess()) {
+        Status st = walker->error_handler_(outcome.GetError());
+        walker->ListObjectsFinished(std::move(st));
+        return;
+      }
+      const auto& result = outcome.GetResult();
+      bool recurse = result.GetCommonPrefixes().size() > 0;
+      if (recurse) {
+        auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
+        if (!maybe_recurse.ok()) {
+          walker->ListObjectsFinished(maybe_recurse.status());
+          return;
+        }
+        recurse &= *maybe_recurse;
+      }
+      Status st = walker->result_handler_(prefix, result);
+      if (!st.ok()) {
+        walker->ListObjectsFinished(std::move(st));
+        return;
+      }
+      if (recurse) {
+        walker->WalkChildren(result, nesting_depth + 1);
+      }
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        req.SetContinuationToken(result.GetNextContinuationToken());
+        walker->client_->ListObjectsV2Async(req, *this);
+      } else {
+        walker->ListObjectsFinished(Status::OK());
+      }
+    }
+
+    void Start() {
+      req.SetBucket(ToAwsString(walker->bucket_));
+      if (!prefix.empty()) {
+        req.SetPrefix(ToAwsString(prefix) + kSep);
+      }
+      req.SetDelimiter(Aws::String() + kSep);
+      req.SetMaxKeys(walker->max_keys_);
+      walker->client_->ListObjectsV2Async(req, *this);
+    }
+  };
+
+  void WalkChild(std::string key, int32_t nesting_depth) {
+    ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
+    ++num_in_flight_;
+    handler.Start();

Review comment:
       `ListObjectsV2Handler::operator()` gets the request as a const-ref, though. On the contrary, `WalkChild` doesn't have any need for the request object except to pass it to the handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org