You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/23 19:53:20 UTC

[arrow] branch master updated: ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a2aef7d9e ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)
9a2aef7d9e is described below

commit 9a2aef7d9e6adb7a0630022aebe4e3de46145af2
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Nov 23 20:53:10 2022 +0100

    ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)
    
    This should hopefully suppress a failed assertion on recent AWS SDK versions.
    
    See https://github.com/aws/aws-sdk-cpp/issues/2204 for upstream issue report.
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
 ci/conda_env_cpp.txt             |   2 +-
 cpp/src/arrow/filesystem/s3fs.cc | 175 ++++++++++++++++++++++-----------------
 cpp/src/arrow/filesystem/s3fs.h  |   8 ++
 3 files changed, 109 insertions(+), 76 deletions(-)

diff --git a/ci/conda_env_cpp.txt b/ci/conda_env_cpp.txt
index f8db0ff6cc..4ca76a7215 100644
--- a/ci/conda_env_cpp.txt
+++ b/ci/conda_env_cpp.txt
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-aws-sdk-cpp=1.9.379
+aws-sdk-cpp=1.10.13
 benchmark>=1.6.0
 boost-cpp>=1.68.0
 brotli
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 7d9f4f88a6..16ffe25266 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -122,74 +122,6 @@ using internal::ToURLEncodedAwsString;
 
 static const char kSep = '/';
 
-namespace {
-
-std::mutex aws_init_lock;
-Aws::SDKOptions aws_options;
-std::atomic<bool> aws_initialized(false);
-
-Status DoInitializeS3(const S3GlobalOptions& options) {
-  Aws::Utils::Logging::LogLevel aws_log_level;
-
-#define LOG_LEVEL_CASE(level_name)                             \
-  case S3LogLevel::level_name:                                 \
-    aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
-    break;
-
-  switch (options.log_level) {
-    LOG_LEVEL_CASE(Fatal)
-    LOG_LEVEL_CASE(Error)
-    LOG_LEVEL_CASE(Warn)
-    LOG_LEVEL_CASE(Info)
-    LOG_LEVEL_CASE(Debug)
-    LOG_LEVEL_CASE(Trace)
-    default:
-      aws_log_level = Aws::Utils::Logging::LogLevel::Off;
-  }
-
-#undef LOG_LEVEL_CASE
-
-  aws_options.loggingOptions.logLevel = aws_log_level;
-  // By default the AWS SDK logs to files, log to console instead
-  aws_options.loggingOptions.logger_create_fn = [] {
-    return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
-        aws_options.loggingOptions.logLevel);
-  };
-#if (defined(AWS_SDK_VERSION_MAJOR) &&                          \
-     (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
-      (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
-  // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
-  // This configuration options is only available with AWS SDK 1.9.272 and later.
-  aws_options.httpOptions.compliantRfc3986Encoding = true;
-#endif
-  Aws::InitAPI(aws_options);
-  aws_initialized.store(true);
-  return Status::OK();
-}
-
-}  // namespace
-
-Status InitializeS3(const S3GlobalOptions& options) {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  return DoInitializeS3(options);
-}
-
-Status FinalizeS3() {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  Aws::ShutdownAPI(aws_options);
-  aws_initialized.store(false);
-  return Status::OK();
-}
-
-Status EnsureS3Initialized() {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  if (!aws_initialized.load()) {
-    S3GlobalOptions options{S3LogLevel::Fatal};
-    return DoInitializeS3(options);
-  }
-  return Status::OK();
-}
-
 // -----------------------------------------------------------------------
 // S3ProxyOptions implementation
 
@@ -268,7 +200,7 @@ std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsStandardRetryStrategy(
 // S3Options implementation
 
 S3Options::S3Options() {
-  DCHECK(aws_initialized.load()) << "Must initialize S3 before using S3Options";
+  DCHECK(IsS3Initialized()) << "Must initialize S3 before using S3Options";
 }
 
 void S3Options::ConfigureDefaultCredentials() {
@@ -459,7 +391,7 @@ bool S3Options::Equals(const S3Options& other) const {
 namespace {
 
 Status CheckS3Initialized() {
-  if (!aws_initialized.load()) {
+  if (!IsS3Initialized()) {
     return Status::Invalid(
         "S3 subsystem not initialized; please call InitializeS3() "
         "before carrying out any S3-related operation");
@@ -864,8 +796,7 @@ class RegionResolver {
   }
 
   static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
-    static std::shared_ptr<RegionResolver> instance;
-    auto resolver = std::atomic_load(&instance);
+    auto resolver = std::atomic_load(&instance_);
     if (resolver) {
       return resolver;
     }
@@ -876,19 +807,24 @@ class RegionResolver {
     // Make sure to always return the same instance even if several threads
     // call DefaultInstance at once.
     std::shared_ptr<RegionResolver> existing;
-    if (std::atomic_compare_exchange_strong(&instance, &existing, *maybe_resolver)) {
+    if (std::atomic_compare_exchange_strong(&instance_, &existing, *maybe_resolver)) {
       return *maybe_resolver;
     } else {
       return existing;
     }
   }
 
+  static void ResetDefaultInstance() {
+    std::atomic_store(&instance_, std::shared_ptr<RegionResolver>());
+  }
+
   Result<std::string> ResolveRegion(const std::string& bucket) {
     std::unique_lock<std::mutex> lock(cache_mutex_);
     auto it = cache_.find(bucket);
     if (it != cache_.end()) {
       return it->second;
     }
+    // Cache miss: do the actual region lookup
     lock.unlock();
     ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
     lock.lock();
@@ -911,6 +847,8 @@ class RegionResolver {
     return builder_.BuildClient().Value(&client_);
   }
 
+  static std::shared_ptr<RegionResolver> instance_;
+
   ClientBuilder builder_;
   std::shared_ptr<S3Client> client_;
 
@@ -920,6 +858,8 @@ class RegionResolver {
   std::unordered_map<std::string, std::string> cache_;
 };
 
+std::shared_ptr<RegionResolver> RegionResolver::instance_;
+
 // -----------------------------------------------------------------------
 // S3 file stream implementations
 
@@ -2621,9 +2561,94 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
   return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
 }
 
-//
+// -----------------------------------------------------------------------
+// Initialization and finalization
+
+namespace {
+
+std::mutex aws_init_lock;
+Aws::SDKOptions aws_options;
+std::atomic<bool> aws_initialized(false);
+
+Status DoInitializeS3(const S3GlobalOptions& options) {
+  Aws::Utils::Logging::LogLevel aws_log_level;
+
+#define LOG_LEVEL_CASE(level_name)                             \
+  case S3LogLevel::level_name:                                 \
+    aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
+    break;
+
+  switch (options.log_level) {
+    LOG_LEVEL_CASE(Fatal)
+    LOG_LEVEL_CASE(Error)
+    LOG_LEVEL_CASE(Warn)
+    LOG_LEVEL_CASE(Info)
+    LOG_LEVEL_CASE(Debug)
+    LOG_LEVEL_CASE(Trace)
+    default:
+      aws_log_level = Aws::Utils::Logging::LogLevel::Off;
+  }
+
+#undef LOG_LEVEL_CASE
+
+  aws_options.loggingOptions.logLevel = aws_log_level;
+  // By default the AWS SDK logs to files, log to console instead
+  aws_options.loggingOptions.logger_create_fn = [] {
+    return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
+        aws_options.loggingOptions.logLevel);
+  };
+#if (defined(AWS_SDK_VERSION_MAJOR) &&                          \
+     (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
+      (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
+  // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
+  // This configuration options is only available with AWS SDK 1.9.272 and later.
+  aws_options.httpOptions.compliantRfc3986Encoding = true;
+#endif
+  Aws::InitAPI(aws_options);
+  aws_initialized.store(true);
+  return Status::OK();
+}
+
+Status DoFinalizeS3() {
+  RegionResolver::ResetDefaultInstance();
+  Aws::ShutdownAPI(aws_options);
+  aws_initialized.store(false);
+  return Status::OK();
+}
+
+}  // namespace
+
+Status InitializeS3(const S3GlobalOptions& options) {
+  std::lock_guard<std::mutex> lock(aws_init_lock);
+  return DoInitializeS3(options);
+}
+
+Status EnsureS3Initialized() {
+  std::lock_guard<std::mutex> lock(aws_init_lock);
+  if (!aws_initialized.load()) {
+    S3GlobalOptions options{S3LogLevel::Fatal};
+    return DoInitializeS3(options);
+  }
+  return Status::OK();
+}
+
+Status FinalizeS3() {
+  std::lock_guard<std::mutex> lock(aws_init_lock);
+  return DoFinalizeS3();
+}
+
+Status EnsureS3Finalized() {
+  std::lock_guard<std::mutex> lock(aws_init_lock);
+  if (aws_initialized.load()) {
+    return DoFinalizeS3();
+  }
+  return Status::OK();
+}
+
+bool IsS3Initialized() { return aws_initialized.load(); }
+
+// -----------------------------------------------------------------------
 // Top-level utility functions
-//
 
 Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
   if (bucket.empty() || bucket.find_first_of(kSep) != bucket.npos ||
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 3b4731883b..ba642ebe61 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -336,10 +336,18 @@ Status InitializeS3(const S3GlobalOptions& options);
 ARROW_EXPORT
 Status EnsureS3Initialized();
 
+/// Whether S3 was initialized, and not finalized.
+ARROW_EXPORT
+bool IsS3Initialized();
+
 /// Shutdown the S3 APIs.
 ARROW_EXPORT
 Status FinalizeS3();
 
+/// Ensure the S3 APIs are shutdown, but only if not already done.
+ARROW_EXPORT
+Status EnsureS3Finalized();
+
 ARROW_EXPORT
 Result<std::string> ResolveS3BucketRegion(const std::string& bucket);