You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/04/15 03:31:50 UTC

[arrow] branch main updated: GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook (#33858)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1de159d0f6 GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook (#33858)
1de159d0f6 is described below

commit 1de159d0f6763766c19b183dd309b8757723b43a
Author: Weston Pace <we...@gmail.com>
AuthorDate: Fri Apr 14 20:31:39 2023 -0700

    GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook (#33858)
    
    CRITICAL FIX: When statically linking error with AWS it was possible to have a crash on shutdown/exit.  Now that should no longer be possible.
    
    BREAKING CHANGE: S3 can only be initialized and finalized once.
    
    BREAKING CHANGE: S3 (the AWS SDK) will not be finalized until after all CPU & I/O threads are finished.
    * Closes: #15054
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/filesystem/s3fs.cc      | 161 +++++++++++++++++++++-------------
 cpp/src/arrow/filesystem/s3fs.h       |   3 +
 cpp/src/arrow/filesystem/s3fs_test.cc |   7 +-
 python/pyarrow/fs.py                  |   2 +
 r/R/arrow-package.R                   |  14 +++
 r/R/arrowExports.R                    |   4 +
 r/src/arrowExports.cpp                |   9 ++
 r/src/filesystem.cpp                  |   7 ++
 8 files changed, 142 insertions(+), 65 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 3b5846f575..a22d9c10be 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -2571,100 +2571,137 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
 
 namespace {
 
-std::mutex aws_init_lock;
-Aws::SDKOptions aws_options;
-std::atomic<bool> aws_initialized(false);
+struct AwsInstance : public ::arrow::internal::Executor::Resource {
+  AwsInstance() : is_initialized_(false), is_finalized_(false) {}
+  ~AwsInstance() { Finalize(/*from_destructor=*/true); }
+
+  // Returns true iff the instance was newly initialized with `options`
+  Result<bool> EnsureInitialized(const S3GlobalOptions& options) {
+    bool expected = false;
+    if (is_finalized_.load()) {
+      return Status::Invalid("Attempt to initialize S3 after it has been finalized");
+    }
+    if (is_initialized_.compare_exchange_strong(expected, true)) {
+      DoInitialize(options);
+      return true;
+    }
+    return false;
+  }
 
-Status DoInitializeS3(const S3GlobalOptions& options) {
-  Aws::Utils::Logging::LogLevel aws_log_level;
+  bool IsInitialized() { return !is_finalized_ && is_initialized_; }
+
+  void Finalize(bool from_destructor = false) {
+    bool expected = true;
+    is_finalized_.store(true);
+    if (is_initialized_.compare_exchange_strong(expected, false)) {
+      if (from_destructor) {
+        ARROW_LOG(WARNING)
+            << " arrow::fs::FinalizeS3 was not called even though S3 was initialized.  "
+               "This could lead to a segmentation fault at exit";
+        RegionResolver::ResetDefaultInstance();
+        Aws::ShutdownAPI(aws_options_);
+      }
+    }
+  }
+
+ private:
+  void DoInitialize(const S3GlobalOptions& options) {
+    Aws::Utils::Logging::LogLevel aws_log_level;
 
 #define LOG_LEVEL_CASE(level_name)                             \
   case S3LogLevel::level_name:                                 \
     aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
     break;
 
-  switch (options.log_level) {
-    LOG_LEVEL_CASE(Fatal)
-    LOG_LEVEL_CASE(Error)
-    LOG_LEVEL_CASE(Warn)
-    LOG_LEVEL_CASE(Info)
-    LOG_LEVEL_CASE(Debug)
-    LOG_LEVEL_CASE(Trace)
-    default:
-      aws_log_level = Aws::Utils::Logging::LogLevel::Off;
-  }
+    switch (options.log_level) {
+      LOG_LEVEL_CASE(Fatal)
+      LOG_LEVEL_CASE(Error)
+      LOG_LEVEL_CASE(Warn)
+      LOG_LEVEL_CASE(Info)
+      LOG_LEVEL_CASE(Debug)
+      LOG_LEVEL_CASE(Trace)
+      default:
+        aws_log_level = Aws::Utils::Logging::LogLevel::Off;
+    }
 
 #undef LOG_LEVEL_CASE
 
 #ifdef ARROW_S3_HAS_CRT
-  aws_options.ioOptions.clientBootstrap_create_fn =
-      [ev_threads = options.num_event_loop_threads]() {
-        // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
-        Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
-        Aws::Crt::Io::DefaultHostResolver default_host_resolver(
-            event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
-        auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
-            "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
-        client_bootstrap->EnableBlockingShutdown();
-        return client_bootstrap;
-      };
+    aws_options_.ioOptions.clientBootstrap_create_fn =
+        [ev_threads = options.num_event_loop_threads]() {
+          // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
+          Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
+          Aws::Crt::Io::DefaultHostResolver default_host_resolver(
+              event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
+          auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
+              "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
+          client_bootstrap->EnableBlockingShutdown();
+          return client_bootstrap;
+        };
 #endif
-
-  aws_options.loggingOptions.logLevel = aws_log_level;
-  // By default the AWS SDK logs to files, log to console instead
-  aws_options.loggingOptions.logger_create_fn = [] {
-    return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
-        aws_options.loggingOptions.logLevel);
-  };
+    aws_options_.loggingOptions.logLevel = aws_log_level;
+    // By default the AWS SDK logs to files, log to console instead
+    aws_options_.loggingOptions.logger_create_fn = [this] {
+      return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
+          aws_options_.loggingOptions.logLevel);
+    };
 #if (defined(AWS_SDK_VERSION_MAJOR) &&                          \
      (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
       (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
-  // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
-  // This configuration options is only available with AWS SDK 1.9.272 and later.
-  aws_options.httpOptions.compliantRfc3986Encoding = true;
+    // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
+    // This configuration options is only available with AWS SDK 1.9.272 and later.
+    aws_options_.httpOptions.compliantRfc3986Encoding = true;
 #endif
-  Aws::InitAPI(aws_options);
-  aws_initialized.store(true);
-  return Status::OK();
+    Aws::InitAPI(aws_options_);
+  }
+
+  Aws::SDKOptions aws_options_;
+  std::atomic<bool> is_initialized_;
+  std::atomic<bool> is_finalized_;
+};
+
+std::shared_ptr<AwsInstance> CreateAwsInstance() {
+  auto instance = std::make_shared<AwsInstance>();
+  // Don't let S3 be shutdown until all Arrow threads are done using it
+  arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
+  io::internal::GetIOThreadPool()->KeepAlive(instance);
+  return instance;
 }
 
-Status DoFinalizeS3() {
-  RegionResolver::ResetDefaultInstance();
-  Aws::ShutdownAPI(aws_options);
-  aws_initialized.store(false);
-  return Status::OK();
+AwsInstance& GetAwsInstance() {
+  static auto instance = CreateAwsInstance();
+  return *instance;
+}
+
+Result<bool> EnsureAwsInstanceInitialized(const S3GlobalOptions& options) {
+  return GetAwsInstance().EnsureInitialized(options);
 }
 
 }  // namespace
 
 Status InitializeS3(const S3GlobalOptions& options) {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  return DoInitializeS3(options);
-}
-
-Status EnsureS3Initialized() {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  if (!aws_initialized.load()) {
-    S3GlobalOptions options{S3LogLevel::Fatal};
-    return DoInitializeS3(options);
+  ARROW_ASSIGN_OR_RAISE(bool successfully_initialized,
+                        EnsureAwsInstanceInitialized(options));
+  if (!successfully_initialized) {
+    return Status::Invalid(
+        "S3 was already initialized.  It is safe to use but the options passed in this "
+        "call have been ignored.");
   }
   return Status::OK();
 }
 
-Status FinalizeS3() {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  return DoFinalizeS3();
+Status EnsureS3Initialized() {
+  return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status();
 }
 
-Status EnsureS3Finalized() {
-  std::lock_guard<std::mutex> lock(aws_init_lock);
-  if (aws_initialized.load()) {
-    return DoFinalizeS3();
-  }
+Status FinalizeS3() {
+  GetAwsInstance().Finalize();
   return Status::OK();
 }
 
-bool IsS3Initialized() { return aws_initialized.load(); }
+Status EnsureS3Finalized() { return FinalizeS3(); }
+
+bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }
 
 // -----------------------------------------------------------------------
 // Top-level utility functions
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 2be16f869d..2bccecafe8 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -335,6 +335,9 @@ struct ARROW_EXPORT S3GlobalOptions {
 
 /// Initialize the S3 APIs.  It is required to call this function at least once
 /// before using S3FileSystem.
+///
+/// Once this function is called you MUST call FinalizeS3 before the end of the
+/// application in order to avoid a segmentation fault at shutdown.
 ARROW_EXPORT
 Status InitializeS3(const S3GlobalOptions& options);
 
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index 6295705785..38df84bded 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin {
     Status connect_status;
     int retries = kNumServerRetries;
     do {
-      InitServerAndClient();
+      ASSERT_OK(InitServerAndClient());
       connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets());
     } while (!connect_status.ok() && --retries > 0);
     ASSERT_OK(connect_status);
@@ -198,8 +198,8 @@ class S3TestMixin : public AwsTestMixin {
   }
 
  protected:
-  void InitServerAndClient() {
-    ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer());
+  Status InitServerAndClient() {
+    ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer());
     client_config_.reset(new Aws::Client::ClientConfiguration());
     client_config_->endpointOverride = ToAwsString(minio_->connect_string());
     client_config_->scheme = Aws::Http::Scheme::HTTP;
@@ -211,6 +211,7 @@ class S3TestMixin : public AwsTestMixin {
         new Aws::S3::S3Client(credentials_, *client_config_,
                               Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
                               use_virtual_addressing));
+    return Status::OK();
   }
 
   // How many times to try launching a server in a row before decreeing failure
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index e8e53225fb..567bea8ac0 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -59,6 +59,8 @@ except ImportError:
     _not_imported.append("S3FileSystem")
 else:
     ensure_s3_initialized()
+    import atexit
+    atexit.register(finalize_s3)
 
 
 def __getattr__(name):
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index a3c860a51c..76c420e21f 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -106,6 +106,15 @@ supported_dplyr_methods <- list(
   explain = NULL
 )
 
+# This should be run at session exit and must be called
+# to avoid a segmentation fault at shutdown
+finalize_s3 <- function(env) {
+  FinalizeS3()
+}
+
+# Helper environment to register the exit hook
+s3_finalizer <- new.env(parent = emptyenv())
+
 #' @importFrom vctrs s3_register vec_size vec_cast vec_unique
 .onLoad <- function(...) {
   # Make sure C++ knows on which thread it is safe to call the R API
@@ -147,6 +156,11 @@ supported_dplyr_methods <- list(
   # Register extension types that we use internally
   reregister_extension_type(vctrs_extension_type(vctrs::unspecified()))
 
+  # Registers a callback to run at session exit
+  # This can't be done in .onUnload or .onDetach because those hooks are
+  # not guaranteed to run (e.g. they only run if the user unloads arrow)
+  reg.finalizer(s3_finalizer, finalize_s3, onexit = TRUE)
+
   invisible()
 }
 
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index b3dd3a9601..a8e8f5b8af 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1352,6 +1352,10 @@ fs___S3FileSystem__region <- function(fs) {
   .Call(`_arrow_fs___S3FileSystem__region`, fs)
 }
 
+FinalizeS3 <- function() {
+  invisible(.Call(`_arrow_FinalizeS3`))
+}
+
 fs___GcsFileSystem__Make <- function(anonymous, options) {
   .Call(`_arrow_fs___GcsFileSystem__Make`, anonymous, options)
 }
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index fb333e0c07..55c59f4b38 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -3489,6 +3489,14 @@ extern "C" SEXP _arrow_fs___S3FileSystem__region(SEXP fs_sexp){
 }
 #endif
 
+// filesystem.cpp
+void FinalizeS3();
+extern "C" SEXP _arrow_FinalizeS3(){
+BEGIN_CPP11
+	FinalizeS3();
+	return R_NilValue;
+END_CPP11
+}
 // filesystem.cpp
 #if defined(ARROW_R_WITH_GCS)
 std::shared_ptr<fs::GcsFileSystem> fs___GcsFileSystem__Make(bool anonymous, cpp11::list options);
@@ -5828,6 +5836,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_fs___CopyFiles", (DL_FUNC) &_arrow_fs___CopyFiles, 6}, 
 		{ "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 17}, 
 		{ "_arrow_fs___S3FileSystem__region", (DL_FUNC) &_arrow_fs___S3FileSystem__region, 1}, 
+		{ "_arrow_FinalizeS3", (DL_FUNC) &_arrow_FinalizeS3, 0}, 
 		{ "_arrow_fs___GcsFileSystem__Make", (DL_FUNC) &_arrow_fs___GcsFileSystem__Make, 2}, 
 		{ "_arrow_fs___GcsFileSystem__options", (DL_FUNC) &_arrow_fs___GcsFileSystem__options, 1}, 
 		{ "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2}, 
diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp
index cd795c0f80..4388d111b4 100644
--- a/r/src/filesystem.cpp
+++ b/r/src/filesystem.cpp
@@ -351,6 +351,13 @@ std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& f
 
 #endif
 
+// [[arrow::export]]
+void FinalizeS3() {
+#if defined(ARROW_R_WITH_S3)
+  StopIfNotOk(fs::FinalizeS3());
+#endif
+}
+
 #if defined(ARROW_R_WITH_GCS)
 
 #include <arrow/filesystem/gcsfs.h>