You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/12/08 12:19:10 UTC
[arrow] branch master updated: ARROW-10788: [C++] Make S3 recursive
tree walks parallel
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d1b8ac0 ARROW-10788: [C++] Make S3 recursive tree walks parallel
d1b8ac0 is described below
commit d1b8ac07ef5b6f729e147458930e3e6196602dca
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Dec 8 13:18:14 2020 +0100
ARROW-10788: [C++] Make S3 recursive tree walks parallel
Use the AWS SDK async APIs to launch child directory reads concurrently as soon
as we get the required information from a parent read.
Also, similarly issue directory tree deletion commands in parallel.
On this machine, listing the entire directory tree at "s3://mf-nwp-models/arome-france/v2/2020-12-02"
goes down from 12 seconds to 2 seconds (a 6x speedup, for 12944 directory entries).
Closes #8818 from pitrou/ARROW-10788-s3-walk-async
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/filesystem/s3fs.cc | 342 ++++++++++++++++--------
cpp/src/arrow/filesystem/s3fs_narrative_test.cc | 2 +
cpp/src/arrow/util/future.h | 9 +
3 files changed, 237 insertions(+), 116 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 0218faf..1f6da9e 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
+#include <functional>
#include <mutex>
#include <sstream>
#include <unordered_map>
@@ -75,6 +76,7 @@
#include "arrow/status.h"
#include "arrow/util/atomic_shared_ptr.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/windows_fixup.h"
@@ -1080,6 +1082,139 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
}
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+ using ResultHandler = std::function<Status(const std::string& prefix,
+ const S3Model::ListObjectsV2Result&)>;
+ using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+ using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+ Aws::S3::S3Client* client_;
+ const std::string bucket_;
+ const std::string base_dir_;
+ const int32_t max_keys_;
+ const ResultHandler result_handler_;
+ const ErrorHandler error_handler_;
+ const RecursionHandler recursion_handler_;
+
+ template <typename... Args>
+ static Status Walk(Args&&... args) {
+ auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+ return self->DoWalk();
+ }
+
+ TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
+ int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
+ RecursionHandler recursion_handler)
+ : client_(std::move(client)),
+ bucket_(std::move(bucket)),
+ base_dir_(std::move(base_dir)),
+ max_keys_(max_keys),
+ result_handler_(std::move(result_handler)),
+ error_handler_(std::move(error_handler)),
+ recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+ std::mutex mutex_;
+ Future<> future_;
+ std::atomic<int32_t> num_in_flight_;
+
+ Status DoWalk() {
+ future_ = decltype(future_)::Make();
+ num_in_flight_ = 0;
+ WalkChild(base_dir_, /*nesting_depth=*/0);
+ // When this returns, ListObjectsV2 tasks either have finished or will exit early
+ return future_.status();
+ }
+
+ bool is_finished() const { return future_.is_finished(); }
+
+ void ListObjectsFinished(Status st) {
+ const auto in_flight = --num_in_flight_;
+ if (!st.ok() || !in_flight) {
+ future_.MarkFinished(std::move(st));
+ }
+ }
+
+ struct ListObjectsV2Handler {
+ std::shared_ptr<TreeWalker> walker;
+ std::string prefix;
+ int32_t nesting_depth;
+ S3Model::ListObjectsV2Request req;
+
+ void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
+ const S3Model::ListObjectsV2Outcome& outcome,
+ const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
+ // Serialize calls to operation-specific handlers
+ std::unique_lock<std::mutex> guard(walker->mutex_);
+ if (walker->is_finished()) {
+ // Early exit: avoid executing handlers if DoWalk() returned
+ return;
+ }
+ if (!outcome.IsSuccess()) {
+ Status st = walker->error_handler_(outcome.GetError());
+ walker->ListObjectsFinished(std::move(st));
+ return;
+ }
+ HandleResult(outcome.GetResult());
+ }
+
+ void HandleResult(const S3Model::ListObjectsV2Result& result) {
+ bool recurse = result.GetCommonPrefixes().size() > 0;
+ if (recurse) {
+ auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
+ if (!maybe_recurse.ok()) {
+ walker->ListObjectsFinished(maybe_recurse.status());
+ return;
+ }
+ recurse &= *maybe_recurse;
+ }
+ Status st = walker->result_handler_(prefix, result);
+ if (!st.ok()) {
+ walker->ListObjectsFinished(std::move(st));
+ return;
+ }
+ if (recurse) {
+ walker->WalkChildren(result, nesting_depth + 1);
+ }
+ // If the result was truncated, issue a continuation request to get
+ // further directory entries.
+ if (result.GetIsTruncated()) {
+ DCHECK(!result.GetNextContinuationToken().empty());
+ req.SetContinuationToken(result.GetNextContinuationToken());
+ walker->client_->ListObjectsV2Async(req, *this);
+ } else {
+ walker->ListObjectsFinished(Status::OK());
+ }
+ }
+
+ void Start() {
+ req.SetBucket(ToAwsString(walker->bucket_));
+ if (!prefix.empty()) {
+ req.SetPrefix(ToAwsString(prefix) + kSep);
+ }
+ req.SetDelimiter(Aws::String() + kSep);
+ req.SetMaxKeys(walker->max_keys_);
+ walker->client_->ListObjectsV2Async(req, *this);
+ }
+ };
+
+ void WalkChild(std::string key, int32_t nesting_depth) {
+ ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
+ ++num_in_flight_;
+ handler.Start();
+ }
+
+ void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
+ for (const auto& prefix : result.GetCommonPrefixes()) {
+ const auto child_key =
+ internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
+ WalkChild(std::string{child_key}, nesting_depth);
+ }
+ }
+
+ friend struct ListObjectsV2Handler;
+};
+
} // namespace
// -----------------------------------------------------------------------
@@ -1209,60 +1344,53 @@ class S3FileSystem::Impl {
outcome.GetError());
}
- // List objects under a given prefix, issuing continuation requests if necessary
- template <typename ResultCallable, typename ErrorCallable>
- Status ListObjectsV2(const std::string& bucket, const std::string& prefix,
- ResultCallable&& result_callable, ErrorCallable&& error_callable) {
- S3Model::ListObjectsV2Request req;
- req.SetBucket(ToAwsString(bucket));
- if (!prefix.empty()) {
- req.SetPrefix(ToAwsString(prefix) + kSep);
- }
- req.SetDelimiter(Aws::String() + kSep);
- req.SetMaxKeys(kListObjectsMaxKeys);
-
- while (true) {
- auto outcome = client_->ListObjectsV2(req);
- if (!outcome.IsSuccess()) {
- return error_callable(outcome.GetError());
- }
- const auto& result = outcome.GetResult();
- RETURN_NOT_OK(result_callable(result));
- // Was the result limited by max-keys? If so, use the continuation token
- // to fetch further results.
- if (!result.GetIsTruncated()) {
- break;
- }
- DCHECK(!result.GetNextContinuationToken().empty());
- req.SetContinuationToken(result.GetNextContinuationToken());
+ Status CheckNestingDepth(int32_t nesting_depth) {
+ if (nesting_depth >= kMaxNestingDepth) {
+ return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
+ kMaxNestingDepth, ")");
}
return Status::OK();
}
- // Recursive workhorse for GetTargetStats(FileSelector...)
+ // Workhorse for GetTargetStats(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
- int32_t nesting_depth = 0;
- return Walk(select, bucket, key, nesting_depth, out);
- }
+ bool is_empty = true;
- Status Walk(const FileSelector& select, const std::string& bucket,
- const std::string& key, int32_t nesting_depth, std::vector<FileInfo>* out) {
- if (nesting_depth >= kMaxNestingDepth) {
- return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
- kMaxNestingDepth, ")");
- }
+ auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
+ if (select.allow_not_found && IsNotFound(error)) {
+ return Status::OK();
+ }
+ return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+ "' in bucket '", bucket, "': "),
+ error);
+ };
- bool is_empty = true;
- std::vector<std::string> child_keys;
+ auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+ RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
+ return select.recursive && nesting_depth <= select.max_recursion;
+ };
- auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
+ auto handle_results = [&](const std::string& prefix,
+ const S3Model::ListObjectsV2Result& result) -> Status {
+ // Walk "directories"
+ for (const auto& prefix : result.GetCommonPrefixes()) {
+ is_empty = false;
+ const auto child_key =
+ internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
+ std::stringstream child_path;
+ child_path << bucket << kSep << child_key;
+ FileInfo info;
+ info.set_path(child_path.str());
+ info.set_type(FileType::Directory);
+ out->push_back(std::move(info));
+ }
// Walk "files"
for (const auto& obj : result.GetContents()) {
is_empty = false;
FileInfo info;
const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
- if (child_key == util::string_view(key)) {
+ if (child_key == util::string_view(prefix)) {
// Amazon can return the "directory" key itself as part of the results, skip
continue;
}
@@ -1272,42 +1400,11 @@ class S3FileSystem::Impl {
FileObjectToInfo(obj, &info);
out->push_back(std::move(info));
}
- // Walk "directories"
- for (const auto& prefix : result.GetCommonPrefixes()) {
- is_empty = false;
- const auto child_key =
- internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
- std::stringstream ss;
- ss << bucket << kSep << child_key;
- FileInfo info;
- info.set_path(ss.str());
- info.set_type(FileType::Directory);
- out->push_back(std::move(info));
- if (select.recursive) {
- child_keys.emplace_back(child_key);
- }
- }
return Status::OK();
};
- auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
- if (select.allow_not_found && IsNotFound(error)) {
- return Status::OK();
- }
- return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
- "' in bucket '", bucket, "': "),
- error);
- };
-
- RETURN_NOT_OK(
- ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
-
- // Recurse
- if (select.recursive && nesting_depth < select.max_recursion) {
- for (const auto& child_key : child_keys) {
- RETURN_NOT_OK(Walk(select, bucket, child_key, nesting_depth + 1, out));
- }
- }
+ RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+ handle_results, handle_error, handle_recursion));
// If no contents were found, perhaps it's an empty "directory",
// or perhaps it's a nonexistent entry. Check.
@@ -1317,36 +1414,25 @@ class S3FileSystem::Impl {
return PathNotFound(bucket, key);
}
}
+ // Sort results for convenience, since they can come massively out of order
+ std::sort(out->begin(), out->end(), FileInfo::ByPath{});
return Status::OK();
}
Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
std::vector<std::string>* file_keys,
std::vector<std::string>* dir_keys) {
- int32_t nesting_depth = 0;
- return WalkForDeleteDir(bucket, key, nesting_depth, file_keys, dir_keys);
- }
-
- Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
- int32_t nesting_depth, std::vector<std::string>* file_keys,
- std::vector<std::string>* dir_keys) {
- if (nesting_depth >= kMaxNestingDepth) {
- return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
- kMaxNestingDepth, ")");
- }
-
- std::vector<std::string> child_keys;
-
- auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
+ auto handle_results = [&](const std::string& prefix,
+ const S3Model::ListObjectsV2Result& result) -> Status {
// Walk "files"
+ file_keys->reserve(file_keys->size() + result.GetContents().size());
for (const auto& obj : result.GetContents()) {
file_keys->emplace_back(FromAwsString(obj.GetKey()));
}
// Walk "directories"
+ dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
for (const auto& prefix : result.GetCommonPrefixes()) {
- auto child_key = FromAwsString(prefix.GetPrefix());
- dir_keys->emplace_back(child_key);
- child_keys.emplace_back(internal::RemoveTrailingSlash(child_key));
+ dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
}
return Status::OK();
};
@@ -1357,20 +1443,53 @@ class S3FileSystem::Impl {
error);
};
- RETURN_NOT_OK(
- ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
+ auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+ RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
+ return true; // Recurse
+ };
- // Recurse
- for (const auto& child_key : child_keys) {
- RETURN_NOT_OK(
- WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys, dir_keys));
- }
- return Status::OK();
+ return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+ handle_results, handle_error, handle_recursion);
}
// Delete multiple objects at once
Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
+ struct DeleteHandler {
+ Future<> future = Future<>::Make();
+
+ // Callback for DeleteObjectsAsync
+ void operator()(const Aws::S3::S3Client*, const S3Model::DeleteObjectsRequest& req,
+ const S3Model::DeleteObjectsOutcome& outcome,
+ const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
+ if (!outcome.IsSuccess()) {
+ future.MarkFinished(ErrorToStatus(outcome.GetError()));
+ return;
+ }
+ // Also need to check per-key errors, even on successful outcome
+ // See
+ // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
+ const auto& errors = outcome.GetResult().GetErrors();
+ if (!errors.empty()) {
+ std::stringstream ss;
+ ss << "Got the following " << errors.size()
+ << " errors when deleting objects in S3 bucket '" << req.GetBucket()
+ << "':\n";
+ for (const auto& error : errors) {
+ ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
+ }
+ future.MarkFinished(Status::IOError(ss.str()));
+ } else {
+ future.MarkFinished();
+ }
+ }
+ };
+
const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
+ std::vector<DeleteHandler> delete_handlers;
+ std::vector<Future<>*> futures;
+ delete_handlers.reserve(keys.size() / chunk_size + 1);
+ futures.reserve(delete_handlers.capacity());
+
for (size_t start = 0; start < keys.size(); start += chunk_size) {
S3Model::DeleteObjectsRequest req;
S3Model::Delete del;
@@ -1379,23 +1498,14 @@ class S3FileSystem::Impl {
}
req.SetBucket(ToAwsString(bucket));
req.SetDelete(std::move(del));
- auto outcome = client_->DeleteObjects(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(outcome.GetError());
- }
- // Also need to check per-key errors, even on successful outcome
- // See
- // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
- const auto& errors = outcome.GetResult().GetErrors();
- if (!errors.empty()) {
- std::stringstream ss;
- ss << "Got the following " << errors.size()
- << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
- for (const auto& error : errors) {
- ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
- }
- return Status::IOError(ss.str());
- }
+ delete_handlers.emplace_back();
+ futures.push_back(&delete_handlers.back().future);
+ client_->DeleteObjectsAsync(req, delete_handlers.back());
+ }
+
+ WaitForAll(futures);
+ for (const auto* fut : futures) {
+ RETURN_NOT_OK(fut->status());
}
return Status::OK();
}
diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
index 0beb51d..f65ccba 100644
--- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
@@ -138,6 +138,7 @@ void TestBucket(int argc, char** argv) {
select.allow_not_found = false;
ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
ASSERT_EQ(infos.size(), 2);
+ SortInfos(&infos);
AssertFileInfo(infos[0], "Dir1/File2", FileType::File, 11);
AssertFileInfo(infos[1], "Dir1/Subdir", FileType::Directory);
@@ -145,6 +146,7 @@ void TestBucket(int argc, char** argv) {
select.recursive = true;
ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
ASSERT_EQ(infos.size(), 2);
+ SortInfos(&infos);
AssertFileInfo(infos[0], "Dir2/Subdir", FileType::Directory);
AssertFileInfo(infos[1], "Dir2/Subdir/File3", FileType::File, 10);
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index ab509e5..1d2aedf 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -187,6 +187,15 @@ class Future {
return impl_->state();
}
+ /// \brief Whether the Future is finished
+ ///
+ /// A false return value is only indicative, as the Future can complete
+ /// concurrently. A true return value is definitive, though.
+ bool is_finished() const {
+ CheckValid();
+ return IsFutureFinished(impl_->state());
+ }
+
/// \brief Wait for the Future to complete and return its Result
const Result<ValueType>& result() const& {
Wait();