You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/23 19:53:20 UTC
[arrow] branch master updated: ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9a2aef7d9e ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)
9a2aef7d9e is described below
commit 9a2aef7d9e6adb7a0630022aebe4e3de46145af2
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Nov 23 20:53:10 2022 +0100
ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)
This should hopefully suppress a failed assertion on recent AWS SDK versions.
See https://github.com/aws/aws-sdk-cpp/issues/2204 for upstream issue report.
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
ci/conda_env_cpp.txt | 2 +-
cpp/src/arrow/filesystem/s3fs.cc | 175 ++++++++++++++++++++++-----------------
cpp/src/arrow/filesystem/s3fs.h | 8 ++
3 files changed, 109 insertions(+), 76 deletions(-)
diff --git a/ci/conda_env_cpp.txt b/ci/conda_env_cpp.txt
index f8db0ff6cc..4ca76a7215 100644
--- a/ci/conda_env_cpp.txt
+++ b/ci/conda_env_cpp.txt
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-aws-sdk-cpp=1.9.379
+aws-sdk-cpp=1.10.13
benchmark>=1.6.0
boost-cpp>=1.68.0
brotli
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 7d9f4f88a6..16ffe25266 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -122,74 +122,6 @@ using internal::ToURLEncodedAwsString;
static const char kSep = '/';
-namespace {
-
-std::mutex aws_init_lock;
-Aws::SDKOptions aws_options;
-std::atomic<bool> aws_initialized(false);
-
-Status DoInitializeS3(const S3GlobalOptions& options) {
- Aws::Utils::Logging::LogLevel aws_log_level;
-
-#define LOG_LEVEL_CASE(level_name) \
- case S3LogLevel::level_name: \
- aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
- break;
-
- switch (options.log_level) {
- LOG_LEVEL_CASE(Fatal)
- LOG_LEVEL_CASE(Error)
- LOG_LEVEL_CASE(Warn)
- LOG_LEVEL_CASE(Info)
- LOG_LEVEL_CASE(Debug)
- LOG_LEVEL_CASE(Trace)
- default:
- aws_log_level = Aws::Utils::Logging::LogLevel::Off;
- }
-
-#undef LOG_LEVEL_CASE
-
- aws_options.loggingOptions.logLevel = aws_log_level;
- // By default the AWS SDK logs to files, log to console instead
- aws_options.loggingOptions.logger_create_fn = [] {
- return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
- aws_options.loggingOptions.logLevel);
- };
-#if (defined(AWS_SDK_VERSION_MAJOR) && \
- (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
- (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
- // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
- // This configuration options is only available with AWS SDK 1.9.272 and later.
- aws_options.httpOptions.compliantRfc3986Encoding = true;
-#endif
- Aws::InitAPI(aws_options);
- aws_initialized.store(true);
- return Status::OK();
-}
-
-} // namespace
-
-Status InitializeS3(const S3GlobalOptions& options) {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- return DoInitializeS3(options);
-}
-
-Status FinalizeS3() {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- Aws::ShutdownAPI(aws_options);
- aws_initialized.store(false);
- return Status::OK();
-}
-
-Status EnsureS3Initialized() {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- if (!aws_initialized.load()) {
- S3GlobalOptions options{S3LogLevel::Fatal};
- return DoInitializeS3(options);
- }
- return Status::OK();
-}
-
// -----------------------------------------------------------------------
// S3ProxyOptions implementation
@@ -268,7 +200,7 @@ std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsStandardRetryStrategy(
// S3Options implementation
S3Options::S3Options() {
- DCHECK(aws_initialized.load()) << "Must initialize S3 before using S3Options";
+ DCHECK(IsS3Initialized()) << "Must initialize S3 before using S3Options";
}
void S3Options::ConfigureDefaultCredentials() {
@@ -459,7 +391,7 @@ bool S3Options::Equals(const S3Options& other) const {
namespace {
Status CheckS3Initialized() {
- if (!aws_initialized.load()) {
+ if (!IsS3Initialized()) {
return Status::Invalid(
"S3 subsystem not initialized; please call InitializeS3() "
"before carrying out any S3-related operation");
@@ -864,8 +796,7 @@ class RegionResolver {
}
static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
- static std::shared_ptr<RegionResolver> instance;
- auto resolver = std::atomic_load(&instance);
+ auto resolver = std::atomic_load(&instance_);
if (resolver) {
return resolver;
}
@@ -876,19 +807,24 @@ class RegionResolver {
// Make sure to always return the same instance even if several threads
// call DefaultInstance at once.
std::shared_ptr<RegionResolver> existing;
- if (std::atomic_compare_exchange_strong(&instance, &existing, *maybe_resolver)) {
+ if (std::atomic_compare_exchange_strong(&instance_, &existing, *maybe_resolver)) {
return *maybe_resolver;
} else {
return existing;
}
}
+ static void ResetDefaultInstance() {
+ std::atomic_store(&instance_, std::shared_ptr<RegionResolver>());
+ }
+
Result<std::string> ResolveRegion(const std::string& bucket) {
std::unique_lock<std::mutex> lock(cache_mutex_);
auto it = cache_.find(bucket);
if (it != cache_.end()) {
return it->second;
}
+ // Cache miss: do the actual region lookup
lock.unlock();
ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
lock.lock();
@@ -911,6 +847,8 @@ class RegionResolver {
return builder_.BuildClient().Value(&client_);
}
+ static std::shared_ptr<RegionResolver> instance_;
+
ClientBuilder builder_;
std::shared_ptr<S3Client> client_;
@@ -920,6 +858,8 @@ class RegionResolver {
std::unordered_map<std::string, std::string> cache_;
};
+std::shared_ptr<RegionResolver> RegionResolver::instance_;
+
// -----------------------------------------------------------------------
// S3 file stream implementations
@@ -2621,9 +2561,94 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
}
-//
+// -----------------------------------------------------------------------
+// Initialization and finalization
+
+namespace {
+
+std::mutex aws_init_lock;
+Aws::SDKOptions aws_options;
+std::atomic<bool> aws_initialized(false);
+
+Status DoInitializeS3(const S3GlobalOptions& options) {
+ Aws::Utils::Logging::LogLevel aws_log_level;
+
+#define LOG_LEVEL_CASE(level_name) \
+ case S3LogLevel::level_name: \
+ aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
+ break;
+
+ switch (options.log_level) {
+ LOG_LEVEL_CASE(Fatal)
+ LOG_LEVEL_CASE(Error)
+ LOG_LEVEL_CASE(Warn)
+ LOG_LEVEL_CASE(Info)
+ LOG_LEVEL_CASE(Debug)
+ LOG_LEVEL_CASE(Trace)
+ default:
+ aws_log_level = Aws::Utils::Logging::LogLevel::Off;
+ }
+
+#undef LOG_LEVEL_CASE
+
+ aws_options.loggingOptions.logLevel = aws_log_level;
+ // By default the AWS SDK logs to files, log to console instead
+ aws_options.loggingOptions.logger_create_fn = [] {
+ return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
+ aws_options.loggingOptions.logLevel);
+ };
+#if (defined(AWS_SDK_VERSION_MAJOR) && \
+ (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
+ (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
+ // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
+ // This configuration options is only available with AWS SDK 1.9.272 and later.
+ aws_options.httpOptions.compliantRfc3986Encoding = true;
+#endif
+ Aws::InitAPI(aws_options);
+ aws_initialized.store(true);
+ return Status::OK();
+}
+
+Status DoFinalizeS3() {
+ RegionResolver::ResetDefaultInstance();
+ Aws::ShutdownAPI(aws_options);
+ aws_initialized.store(false);
+ return Status::OK();
+}
+
+} // namespace
+
+Status InitializeS3(const S3GlobalOptions& options) {
+ std::lock_guard<std::mutex> lock(aws_init_lock);
+ return DoInitializeS3(options);
+}
+
+Status EnsureS3Initialized() {
+ std::lock_guard<std::mutex> lock(aws_init_lock);
+ if (!aws_initialized.load()) {
+ S3GlobalOptions options{S3LogLevel::Fatal};
+ return DoInitializeS3(options);
+ }
+ return Status::OK();
+}
+
+Status FinalizeS3() {
+ std::lock_guard<std::mutex> lock(aws_init_lock);
+ return DoFinalizeS3();
+}
+
+Status EnsureS3Finalized() {
+ std::lock_guard<std::mutex> lock(aws_init_lock);
+ if (aws_initialized.load()) {
+ return DoFinalizeS3();
+ }
+ return Status::OK();
+}
+
+bool IsS3Initialized() { return aws_initialized.load(); }
+
+// -----------------------------------------------------------------------
// Top-level utility functions
-//
Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
if (bucket.empty() || bucket.find_first_of(kSep) != bucket.npos ||
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 3b4731883b..ba642ebe61 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -336,10 +336,18 @@ Status InitializeS3(const S3GlobalOptions& options);
ARROW_EXPORT
Status EnsureS3Initialized();
+/// Whether S3 was initialized, and not finalized.
+ARROW_EXPORT
+bool IsS3Initialized();
+
/// Shutdown the S3 APIs.
ARROW_EXPORT
Status FinalizeS3();
+/// Ensure the S3 APIs are shutdown, but only if not already done.
+ARROW_EXPORT
+Status EnsureS3Finalized();
+
ARROW_EXPORT
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);