You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "felipecrv (via GitHub)" <gi...@apache.org> on 2023/04/13 01:42:20 UTC

[GitHub] [arrow] felipecrv commented on a diff in pull request #33858: GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook

felipecrv commented on code in PR #33858:
URL: https://github.com/apache/arrow/pull/33858#discussion_r1164854000


##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2573,98 +2573,105 @@ namespace {
 
 std::mutex aws_init_lock;
 Aws::SDKOptions aws_options;
-std::atomic<bool> aws_initialized(false);
 
-Status DoInitializeS3(const S3GlobalOptions& options) {
-  Aws::Utils::Logging::LogLevel aws_log_level;
+struct AwsInstance : public ::arrow::internal::Executor::Resource {
+  explicit AwsInstance(const S3GlobalOptions& options) {
+    Aws::Utils::Logging::LogLevel aws_log_level;
 
 #define LOG_LEVEL_CASE(level_name)                             \
   case S3LogLevel::level_name:                                 \
     aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
     break;
 
-  switch (options.log_level) {
-    LOG_LEVEL_CASE(Fatal)
-    LOG_LEVEL_CASE(Error)
-    LOG_LEVEL_CASE(Warn)
-    LOG_LEVEL_CASE(Info)
-    LOG_LEVEL_CASE(Debug)
-    LOG_LEVEL_CASE(Trace)
-    default:
-      aws_log_level = Aws::Utils::Logging::LogLevel::Off;
-  }
+    switch (options.log_level) {
+      LOG_LEVEL_CASE(Fatal)
+      LOG_LEVEL_CASE(Error)
+      LOG_LEVEL_CASE(Warn)
+      LOG_LEVEL_CASE(Info)
+      LOG_LEVEL_CASE(Debug)
+      LOG_LEVEL_CASE(Trace)
+      default:
+        aws_log_level = Aws::Utils::Logging::LogLevel::Off;
+    }
 
 #undef LOG_LEVEL_CASE
 
 #ifdef ARROW_S3_HAS_CRT
-  aws_options.ioOptions.clientBootstrap_create_fn =
-      [ev_threads = options.num_event_loop_threads]() {
-        // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
-        Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
-        Aws::Crt::Io::DefaultHostResolver default_host_resolver(
-            event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
-        auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
-            "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
-        client_bootstrap->EnableBlockingShutdown();
-        return client_bootstrap;
-      };
+    aws_options.ioOptions.clientBootstrap_create_fn =
+        [ev_threads = options.num_event_loop_threads]() {
+          // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
+          Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
+          Aws::Crt::Io::DefaultHostResolver default_host_resolver(
+              event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
+          auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
+              "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
+          client_bootstrap->EnableBlockingShutdown();
+          return client_bootstrap;
+        };
 #endif
-
-  aws_options.loggingOptions.logLevel = aws_log_level;
-  // By default the AWS SDK logs to files, log to console instead
-  aws_options.loggingOptions.logger_create_fn = [] {
-    return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
-        aws_options.loggingOptions.logLevel);
-  };
+    aws_options.loggingOptions.logLevel = aws_log_level;
+    // By default the AWS SDK logs to files, log to console instead
+    aws_options.loggingOptions.logger_create_fn = [] {
+      return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
+          aws_options.loggingOptions.logLevel);
+    };
 #if (defined(AWS_SDK_VERSION_MAJOR) &&                          \
      (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
       (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
-  // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
-  // This configuration options is only available with AWS SDK 1.9.272 and later.
-  aws_options.httpOptions.compliantRfc3986Encoding = true;
+    // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
+    // This configuration options is only available with AWS SDK 1.9.272 and later.
+    aws_options.httpOptions.compliantRfc3986Encoding = true;
 #endif
-  Aws::InitAPI(aws_options);
-  aws_initialized.store(true);
-  return Status::OK();
+    Aws::InitAPI(aws_options);
+  }
+
+  ~AwsInstance() {
+    RegionResolver::ResetDefaultInstance();
+    Aws::ShutdownAPI(aws_options);
+  }
+};
+
+std::shared_ptr<AwsInstance> CreateAwsInstance(const S3GlobalOptions& options) {
+  auto instance = std::make_shared<AwsInstance>(options);
+  // Don't let S3 be shutdown until all Arrow threads are done using it
+  arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
+  io::internal::GetIOThreadPool()->KeepAlive(instance);
+  return instance;
 }
 
-Status DoFinalizeS3() {
-  RegionResolver::ResetDefaultInstance();
-  Aws::ShutdownAPI(aws_options);
-  aws_initialized.store(false);
-  return Status::OK();
+std::shared_ptr<AwsInstance>* GetAwsInstance(const S3GlobalOptions& options) {
+  static auto instance = CreateAwsInstance(options);
+  return &instance;
 }
 
+bool IsAwsInitialized() { return !!GetAwsInstance({}); }

Review Comment:
   If this gets called before `InitializeS3`, the `options` passed there will be ignored and an instance based on `{}` options will stick around. I think bringing back the `bool` flag can avoid these risks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org