You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/04/15 03:31:50 UTC
[arrow] branch main updated: GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook (#33858)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1de159d0f6 GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook (#33858)
1de159d0f6 is described below
commit 1de159d0f6763766c19b183dd309b8757723b43a
Author: Weston Pace <we...@gmail.com>
AuthorDate: Fri Apr 14 20:31:39 2023 -0700
GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook (#33858)
CRITICAL FIX: When statically linking error with AWS it was possible to have a crash on shutdown/exit. Now that should no longer be possible.
BREAKING CHANGE: S3 can only be initialized and finalized once.
BREAKING CHANGE: S3 (the AWS SDK) will not be finalized until after all CPU & I/O threads are finished.
* Closes: #15054
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/filesystem/s3fs.cc | 161 +++++++++++++++++++++-------------
cpp/src/arrow/filesystem/s3fs.h | 3 +
cpp/src/arrow/filesystem/s3fs_test.cc | 7 +-
python/pyarrow/fs.py | 2 +
r/R/arrow-package.R | 14 +++
r/R/arrowExports.R | 4 +
r/src/arrowExports.cpp | 9 ++
r/src/filesystem.cpp | 7 ++
8 files changed, 142 insertions(+), 65 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 3b5846f575..a22d9c10be 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -2571,100 +2571,137 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
namespace {
-std::mutex aws_init_lock;
-Aws::SDKOptions aws_options;
-std::atomic<bool> aws_initialized(false);
+struct AwsInstance : public ::arrow::internal::Executor::Resource {
+ AwsInstance() : is_initialized_(false), is_finalized_(false) {}
+ ~AwsInstance() { Finalize(/*from_destructor=*/true); }
+
+ // Returns true iff the instance was newly initialized with `options`
+ Result<bool> EnsureInitialized(const S3GlobalOptions& options) {
+ bool expected = false;
+ if (is_finalized_.load()) {
+ return Status::Invalid("Attempt to initialize S3 after it has been finalized");
+ }
+ if (is_initialized_.compare_exchange_strong(expected, true)) {
+ DoInitialize(options);
+ return true;
+ }
+ return false;
+ }
-Status DoInitializeS3(const S3GlobalOptions& options) {
- Aws::Utils::Logging::LogLevel aws_log_level;
+ bool IsInitialized() { return !is_finalized_ && is_initialized_; }
+
+ void Finalize(bool from_destructor = false) {
+ bool expected = true;
+ is_finalized_.store(true);
+ if (is_initialized_.compare_exchange_strong(expected, false)) {
+ if (from_destructor) {
+ ARROW_LOG(WARNING)
+ << " arrow::fs::FinalizeS3 was not called even though S3 was initialized. "
+ "This could lead to a segmentation fault at exit";
+ RegionResolver::ResetDefaultInstance();
+ Aws::ShutdownAPI(aws_options_);
+ }
+ }
+ }
+
+ private:
+ void DoInitialize(const S3GlobalOptions& options) {
+ Aws::Utils::Logging::LogLevel aws_log_level;
#define LOG_LEVEL_CASE(level_name) \
case S3LogLevel::level_name: \
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
break;
- switch (options.log_level) {
- LOG_LEVEL_CASE(Fatal)
- LOG_LEVEL_CASE(Error)
- LOG_LEVEL_CASE(Warn)
- LOG_LEVEL_CASE(Info)
- LOG_LEVEL_CASE(Debug)
- LOG_LEVEL_CASE(Trace)
- default:
- aws_log_level = Aws::Utils::Logging::LogLevel::Off;
- }
+ switch (options.log_level) {
+ LOG_LEVEL_CASE(Fatal)
+ LOG_LEVEL_CASE(Error)
+ LOG_LEVEL_CASE(Warn)
+ LOG_LEVEL_CASE(Info)
+ LOG_LEVEL_CASE(Debug)
+ LOG_LEVEL_CASE(Trace)
+ default:
+ aws_log_level = Aws::Utils::Logging::LogLevel::Off;
+ }
#undef LOG_LEVEL_CASE
#ifdef ARROW_S3_HAS_CRT
- aws_options.ioOptions.clientBootstrap_create_fn =
- [ev_threads = options.num_event_loop_threads]() {
- // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
- Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
- Aws::Crt::Io::DefaultHostResolver default_host_resolver(
- event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
- auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
- "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
- client_bootstrap->EnableBlockingShutdown();
- return client_bootstrap;
- };
+ aws_options_.ioOptions.clientBootstrap_create_fn =
+ [ev_threads = options.num_event_loop_threads]() {
+ // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
+ Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
+ Aws::Crt::Io::DefaultHostResolver default_host_resolver(
+ event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
+ auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
+ "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
+ client_bootstrap->EnableBlockingShutdown();
+ return client_bootstrap;
+ };
#endif
-
- aws_options.loggingOptions.logLevel = aws_log_level;
- // By default the AWS SDK logs to files, log to console instead
- aws_options.loggingOptions.logger_create_fn = [] {
- return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
- aws_options.loggingOptions.logLevel);
- };
+ aws_options_.loggingOptions.logLevel = aws_log_level;
+ // By default the AWS SDK logs to files, log to console instead
+ aws_options_.loggingOptions.logger_create_fn = [this] {
+ return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
+ aws_options_.loggingOptions.logLevel);
+ };
#if (defined(AWS_SDK_VERSION_MAJOR) && \
(AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
(AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
- // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
- // This configuration options is only available with AWS SDK 1.9.272 and later.
- aws_options.httpOptions.compliantRfc3986Encoding = true;
+ // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
+ // This configuration options is only available with AWS SDK 1.9.272 and later.
+ aws_options_.httpOptions.compliantRfc3986Encoding = true;
#endif
- Aws::InitAPI(aws_options);
- aws_initialized.store(true);
- return Status::OK();
+ Aws::InitAPI(aws_options_);
+ }
+
+ Aws::SDKOptions aws_options_;
+ std::atomic<bool> is_initialized_;
+ std::atomic<bool> is_finalized_;
+};
+
+std::shared_ptr<AwsInstance> CreateAwsInstance() {
+ auto instance = std::make_shared<AwsInstance>();
+ // Don't let S3 be shutdown until all Arrow threads are done using it
+ arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
+ io::internal::GetIOThreadPool()->KeepAlive(instance);
+ return instance;
}
-Status DoFinalizeS3() {
- RegionResolver::ResetDefaultInstance();
- Aws::ShutdownAPI(aws_options);
- aws_initialized.store(false);
- return Status::OK();
+AwsInstance& GetAwsInstance() {
+ static auto instance = CreateAwsInstance();
+ return *instance;
+}
+
+Result<bool> EnsureAwsInstanceInitialized(const S3GlobalOptions& options) {
+ return GetAwsInstance().EnsureInitialized(options);
}
} // namespace
Status InitializeS3(const S3GlobalOptions& options) {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- return DoInitializeS3(options);
-}
-
-Status EnsureS3Initialized() {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- if (!aws_initialized.load()) {
- S3GlobalOptions options{S3LogLevel::Fatal};
- return DoInitializeS3(options);
+ ARROW_ASSIGN_OR_RAISE(bool successfully_initialized,
+ EnsureAwsInstanceInitialized(options));
+ if (!successfully_initialized) {
+ return Status::Invalid(
+ "S3 was already initialized. It is safe to use but the options passed in this "
+ "call have been ignored.");
}
return Status::OK();
}
-Status FinalizeS3() {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- return DoFinalizeS3();
+Status EnsureS3Initialized() {
+ return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status();
}
-Status EnsureS3Finalized() {
- std::lock_guard<std::mutex> lock(aws_init_lock);
- if (aws_initialized.load()) {
- return DoFinalizeS3();
- }
+Status FinalizeS3() {
+ GetAwsInstance().Finalize();
return Status::OK();
}
-bool IsS3Initialized() { return aws_initialized.load(); }
+Status EnsureS3Finalized() { return FinalizeS3(); }
+
+bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }
// -----------------------------------------------------------------------
// Top-level utility functions
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 2be16f869d..2bccecafe8 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -335,6 +335,9 @@ struct ARROW_EXPORT S3GlobalOptions {
/// Initialize the S3 APIs. It is required to call this function at least once
/// before using S3FileSystem.
+///
+/// Once this function is called you MUST call FinalizeS3 before the end of the
+/// application in order to avoid a segmentation fault at shutdown.
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index 6295705785..38df84bded 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin {
Status connect_status;
int retries = kNumServerRetries;
do {
- InitServerAndClient();
+ ASSERT_OK(InitServerAndClient());
connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets());
} while (!connect_status.ok() && --retries > 0);
ASSERT_OK(connect_status);
@@ -198,8 +198,8 @@ class S3TestMixin : public AwsTestMixin {
}
protected:
- void InitServerAndClient() {
- ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer());
+ Status InitServerAndClient() {
+ ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer());
client_config_.reset(new Aws::Client::ClientConfiguration());
client_config_->endpointOverride = ToAwsString(minio_->connect_string());
client_config_->scheme = Aws::Http::Scheme::HTTP;
@@ -211,6 +211,7 @@ class S3TestMixin : public AwsTestMixin {
new Aws::S3::S3Client(credentials_, *client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing));
+ return Status::OK();
}
// How many times to try launching a server in a row before decreeing failure
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index e8e53225fb..567bea8ac0 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -59,6 +59,8 @@ except ImportError:
_not_imported.append("S3FileSystem")
else:
ensure_s3_initialized()
+ import atexit
+ atexit.register(finalize_s3)
def __getattr__(name):
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index a3c860a51c..76c420e21f 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -106,6 +106,15 @@ supported_dplyr_methods <- list(
explain = NULL
)
+# This should be run at session exit and must be called
+# to avoid a segmentation fault at shutdown
+finalize_s3 <- function(env) {
+ FinalizeS3()
+}
+
+# Helper environment to register the exit hook
+s3_finalizer <- new.env(parent = emptyenv())
+
#' @importFrom vctrs s3_register vec_size vec_cast vec_unique
.onLoad <- function(...) {
# Make sure C++ knows on which thread it is safe to call the R API
@@ -147,6 +156,11 @@ supported_dplyr_methods <- list(
# Register extension types that we use internally
reregister_extension_type(vctrs_extension_type(vctrs::unspecified()))
+ # Registers a callback to run at session exit
+ # This can't be done in .onUnload or .onDetach because those hooks are
+ # not guaranteed to run (e.g. they only run if the user unloads arrow)
+ reg.finalizer(s3_finalizer, finalize_s3, onexit = TRUE)
+
invisible()
}
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index b3dd3a9601..a8e8f5b8af 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1352,6 +1352,10 @@ fs___S3FileSystem__region <- function(fs) {
.Call(`_arrow_fs___S3FileSystem__region`, fs)
}
+FinalizeS3 <- function() {
+ invisible(.Call(`_arrow_FinalizeS3`))
+}
+
fs___GcsFileSystem__Make <- function(anonymous, options) {
.Call(`_arrow_fs___GcsFileSystem__Make`, anonymous, options)
}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index fb333e0c07..55c59f4b38 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -3489,6 +3489,14 @@ extern "C" SEXP _arrow_fs___S3FileSystem__region(SEXP fs_sexp){
}
#endif
+// filesystem.cpp
+void FinalizeS3();
+extern "C" SEXP _arrow_FinalizeS3(){
+BEGIN_CPP11
+ FinalizeS3();
+ return R_NilValue;
+END_CPP11
+}
// filesystem.cpp
#if defined(ARROW_R_WITH_GCS)
std::shared_ptr<fs::GcsFileSystem> fs___GcsFileSystem__Make(bool anonymous, cpp11::list options);
@@ -5828,6 +5836,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_fs___CopyFiles", (DL_FUNC) &_arrow_fs___CopyFiles, 6},
{ "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 17},
{ "_arrow_fs___S3FileSystem__region", (DL_FUNC) &_arrow_fs___S3FileSystem__region, 1},
+ { "_arrow_FinalizeS3", (DL_FUNC) &_arrow_FinalizeS3, 0},
{ "_arrow_fs___GcsFileSystem__Make", (DL_FUNC) &_arrow_fs___GcsFileSystem__Make, 2},
{ "_arrow_fs___GcsFileSystem__options", (DL_FUNC) &_arrow_fs___GcsFileSystem__options, 1},
{ "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2},
diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp
index cd795c0f80..4388d111b4 100644
--- a/r/src/filesystem.cpp
+++ b/r/src/filesystem.cpp
@@ -351,6 +351,13 @@ std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& f
#endif
+// [[arrow::export]]
+void FinalizeS3() {
+#if defined(ARROW_R_WITH_S3)
+ StopIfNotOk(fs::FinalizeS3());
+#endif
+}
+
#if defined(ARROW_R_WITH_GCS)
#include <arrow/filesystem/gcsfs.h>