You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/09/21 18:14:09 UTC

[arrow] branch master updated: ARROW-9775: [C++] Automatic S3 region selection

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 248803c  ARROW-9775: [C++] Automatic S3 region selection
248803c is described below

commit 248803c32c4407c60e3dabce28539348291e3d7a
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Sep 21 20:13:40 2020 +0200

    ARROW-9775: [C++] Automatic S3 region selection
    
    Once looked up, the region of a given bucket is cached for the lifetime of the process.
    (the cache size is unbounded, which shouldn't be a problem in practice)
    
    S3Options::FromUri() now resolves the region for the bucket if it wasn't given as a Uri query parameter.
    
    Closes #8205 from pitrou/ARROW-9775-s3-auto-region
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/filesystem/s3fs.cc                | 219 ++++++++++++++++++++----
 cpp/src/arrow/filesystem/s3fs.h                 |   7 +
 cpp/src/arrow/filesystem/s3fs_narrative_test.cc |   6 +-
 cpp/src/arrow/filesystem/s3fs_test.cc           |  51 ++++--
 cpp/src/arrow/type_traits.h                     |   2 +-
 cpp/src/arrow/util/atomic_shared_ptr.h          |  34 +++-
 python/pyarrow/_s3fs.pyx                        |   7 +
 python/pyarrow/tests/test_fs.py                 |  27 ++-
 8 files changed, 299 insertions(+), 54 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 403f278..ebc2ea2 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -36,9 +36,11 @@
 #endif
 
 #include <aws/core/Aws.h>
+#include <aws/core/Region.h>
 #include <aws/core/auth/AWSCredentials.h>
 #include <aws/core/auth/AWSCredentialsProviderChain.h>
 #include <aws/core/client/RetryStrategy.h>
+#include <aws/core/http/HttpResponse.h>
 #include <aws/core/utils/logging/ConsoleLogSystem.h>
 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
@@ -71,6 +73,7 @@
 #include "arrow/io/util_internal.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/atomic_shared_ptr.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/windows_fixup.h"
@@ -161,6 +164,9 @@ Status EnsureS3Initialized() {
   return Status::OK();
 }
 
+// -----------------------------------------------------------------------
+// S3Options implementation
+
 void S3Options::ConfigureDefaultCredentials() {
   credentials_provider =
       std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
@@ -271,9 +277,11 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
     options.ConfigureDefaultCredentials();
   }
 
+  bool region_set = false;
   for (const auto& kv : options_map) {
     if (kv.first == "region") {
       options.region = kv.second;
+      region_set = true;
     } else if (kv.first == "scheme") {
       options.scheme = kv.second;
     } else if (kv.first == "endpoint_override") {
@@ -283,6 +291,11 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
     }
   }
 
+  if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
+    // XXX Should we use a dedicated resolver with the given credentials?
+    ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
+  }
+
   return options;
 }
 
@@ -403,6 +416,161 @@ std::string FormatRange(int64_t start, int64_t length) {
   return ss.str();
 }
 
+class S3Client : public Aws::S3::S3Client {
+ public:
+  using Aws::S3::S3Client::S3Client;
+
+  // To get a bucket's region, we must extract the "x-amz-bucket-region" header
+  // from the response to a HEAD bucket request.
+  // Unfortunately, the S3Client APIs don't let us access the headers of successful
+  // responses.  So we have to cook a AWS request and issue it ourselves.
+
+  Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
+    auto uri = GeneratePresignedUrl(request.GetBucket(),
+                                    /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
+    // NOTE: The signer region argument isn't passed here, as there's no easy
+    // way of computing it (the relevant method is private).
+    auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
+                               Aws::Auth::SIGV4_SIGNER);
+    const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
+                                          : outcome.GetError().GetResponseCode();
+    const auto& headers = outcome.IsSuccess()
+                              ? outcome.GetResult().GetHeaderValueCollection()
+                              : outcome.GetError().GetResponseHeaders();
+
+    const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
+    if (it == headers.end()) {
+      if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
+        return Status::IOError("Bucket '", request.GetBucket(), "' not found");
+      } else if (!outcome.IsSuccess()) {
+        return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
+                                                   request.GetBucket(), "': "),
+                             outcome.GetError());
+      } else {
+        return Status::IOError("When resolving region for bucket '", request.GetBucket(),
+                               "': missing 'x-amz-bucket-region' header in response");
+      }
+    }
+    return std::string(FromAwsString(it->second));
+  }
+
+  Result<std::string> GetBucketRegion(const std::string& bucket) {
+    S3Model::HeadBucketRequest req;
+    req.SetBucket(ToAwsString(bucket));
+    return GetBucketRegion(req);
+  }
+};
+
+class ClientBuilder {
+ public:
+  explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
+
+  Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
+
+  Result<std::unique_ptr<S3Client>> BuildClient() {
+    credentials_provider_ = options_.credentials_provider;
+    client_config_.region = ToAwsString(options_.region);
+    client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
+    if (options_.scheme == "http") {
+      client_config_.scheme = Aws::Http::Scheme::HTTP;
+    } else if (options_.scheme == "https") {
+      client_config_.scheme = Aws::Http::Scheme::HTTPS;
+    } else {
+      return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
+    }
+    client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
+    if (!internal::global_options.tls_ca_file_path.empty()) {
+      client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
+    }
+    if (!internal::global_options.tls_ca_dir_path.empty()) {
+      client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
+    }
+
+    const bool use_virtual_addressing = options_.endpoint_override.empty();
+    return std::unique_ptr<S3Client>(
+        new S3Client(credentials_provider_, client_config_,
+                     Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+                     use_virtual_addressing));
+  }
+
+  const S3Options& options() const { return options_; }
+
+ protected:
+  S3Options options_;
+  Aws::Client::ClientConfiguration client_config_;
+  std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
+};
+
+// -----------------------------------------------------------------------
+// S3 region resolver
+
+class RegionResolver {
+ public:
+  static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
+    std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
+    RETURN_NOT_OK(resolver->Init());
+    return resolver;
+  }
+
+  static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
+    static std::shared_ptr<RegionResolver> instance;
+    auto resolver = arrow::internal::atomic_load(&instance);
+    if (resolver) {
+      return resolver;
+    }
+    auto maybe_resolver = Make(S3Options::Anonymous());
+    if (!maybe_resolver.ok()) {
+      return maybe_resolver;
+    }
+    // Make sure to always return the same instance even if several threads
+    // call DefaultInstance at once.
+    std::shared_ptr<RegionResolver> existing;
+    if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
+                                                        *maybe_resolver)) {
+      return *maybe_resolver;
+    } else {
+      return existing;
+    }
+  }
+
+  Result<std::string> ResolveRegion(const std::string& bucket) {
+    std::unique_lock<std::mutex> lock(cache_mutex_);
+    auto it = cache_.find(bucket);
+    if (it != cache_.end()) {
+      return it->second;
+    }
+    lock.unlock();
+    ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
+    lock.lock();
+    // Note we don't cache a non-existent bucket, as the bucket could be created later
+    cache_[bucket] = region;
+    return region;
+  }
+
+  Result<std::string> ResolveRegionUncached(const std::string& bucket) {
+    return client_->GetBucketRegion(bucket);
+  }
+
+ protected:
+  explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
+
+  Status Init() {
+    DCHECK(builder_.options().endpoint_override.empty());
+    return builder_.BuildClient().Value(&client_);
+  }
+
+  ClientBuilder builder_;
+  std::unique_ptr<S3Client> client_;
+
+  std::mutex cache_mutex_;
+  // XXX Should cache size be bounded?  It must be quite unusual to query millions
+  // of different buckets in a single program invocation...
+  std::unordered_map<std::string, std::string> cache_;
+};
+
+// -----------------------------------------------------------------------
+// S3 file stream implementations
+
 // A non-copying iostream.
 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
@@ -894,11 +1062,12 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
 
 }  // namespace
 
+// -----------------------------------------------------------------------
+// S3 filesystem implementation
+
 class S3FileSystem::Impl {
  public:
-  S3Options options_;
-  Aws::Client::ClientConfiguration client_config_;
-  std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
+  ClientBuilder builder_;
   std::unique_ptr<Aws::S3::S3Client> client_;
 
   const int32_t kListObjectsMaxKeys = 1000;
@@ -907,36 +1076,11 @@ class S3FileSystem::Impl {
   // Limit recursing depth, since a recursion bomb can be created
   const int32_t kMaxNestingDepth = 100;
 
-  explicit Impl(S3Options options) : options_(std::move(options)) {}
+  explicit Impl(S3Options options) : builder_(std::move(options)) {}
 
-  Status Init() {
-    credentials_provider_ = options_.credentials_provider;
-    client_config_.region = ToAwsString(options_.region);
-    client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
-    if (options_.scheme == "http") {
-      client_config_.scheme = Aws::Http::Scheme::HTTP;
-    } else if (options_.scheme == "https") {
-      client_config_.scheme = Aws::Http::Scheme::HTTPS;
-    } else {
-      return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
-    }
-    client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
-    if (!internal::global_options.tls_ca_file_path.empty()) {
-      client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
-    }
-    if (!internal::global_options.tls_ca_dir_path.empty()) {
-      client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
-    }
-
-    bool use_virtual_addressing = options_.endpoint_override.empty();
-    client_.reset(
-        new Aws::S3::S3Client(credentials_provider_, client_config_,
-                              Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
-                              use_virtual_addressing));
-    return Status::OK();
-  }
+  Status Init() { return builder_.BuildClient().Value(&client_); }
 
-  S3Options options() const { return options_; }
+  const S3Options& options() const { return builder_.options(); }
 
   // Create a bucket.  Successful if bucket already exists.
   Status CreateBucket(const std::string& bucket) {
@@ -944,7 +1088,7 @@ class S3FileSystem::Impl {
     S3Model::CreateBucketRequest req;
     config.SetLocationConstraint(
         S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
-            ToAwsString(options_.region)));
+            ToAwsString(options().region)));
     req.SetBucket(ToAwsString(bucket));
     req.SetCreateBucketConfiguration(config);
 
@@ -1586,7 +1730,7 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
   RETURN_NOT_OK(ValidateFilePath(path));
 
   auto ptr = std::make_shared<ObjectOutputStream>(
-      shared_from_this(), impl_->client_.get(), path, impl_->options_);
+      shared_from_this(), impl_->client_.get(), path, impl_->options());
   RETURN_NOT_OK(ptr->Init());
   return ptr;
 }
@@ -1599,5 +1743,14 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
   return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
 }
 
+//
+// Top-level utility functions
+//
+
+Result<std::string> ResolveBucketRegion(const std::string& bucket) {
+  ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
+  return resolver->ResolveRegion(bucket);
+}
+
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index aa39c30..868ef44 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -95,10 +95,12 @@ struct ARROW_EXPORT S3Options {
   /// This is recommended if you use the standard AWS environment variables
   /// and/or configuration file.
   static S3Options Defaults();
+
   /// \brief Initialize with anonymous credentials.
   ///
   /// This will only let you access public buckets.
   static S3Options Anonymous();
+
   /// \brief Initialize with explicit access and secret key.
   ///
   /// Optionally, a session token may also be provided for temporary credentials
@@ -106,11 +108,13 @@ struct ARROW_EXPORT S3Options {
   static S3Options FromAccessKey(const std::string& access_key,
                                  const std::string& secret_key,
                                  const std::string& session_token = "");
+
   /// \brief Initialize from an assumed role.
   static S3Options FromAssumeRole(
       const std::string& role_arn, const std::string& session_name = "",
       const std::string& external_id = "", int load_frequency = 900,
       const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
+
   static Result<S3Options> FromUri(const ::arrow::internal::Uri& uri,
                                    std::string* out_path = NULLPTR);
   static Result<S3Options> FromUri(const std::string& uri,
@@ -216,5 +220,8 @@ Status EnsureS3Initialized();
 ARROW_EXPORT
 Status FinalizeS3();
 
+ARROW_EXPORT
+Result<std::string> ResolveBucketRegion(const std::string& bucket);
+
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
index 548b046..0beb51d 100644
--- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
@@ -43,7 +43,7 @@ DEFINE_string(access_key, "", "S3 access key");
 DEFINE_string(secret_key, "", "S3 secret key");
 
 DEFINE_string(bucket, "", "bucket name");
-DEFINE_string(region, arrow::fs::kS3DefaultRegion, "AWS region");
+DEFINE_string(region, "", "AWS region");
 DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')");
 DEFINE_string(scheme, "https", "Connection scheme");
 
@@ -201,6 +201,10 @@ void TestMain(int argc, char** argv) {
                           : (FLAGS_verbose ? S3LogLevel::Warn : S3LogLevel::Fatal);
   ASSERT_OK(InitializeS3(options));
 
+  if (FLAGS_region.empty()) {
+    ASSERT_OK_AND_ASSIGN(FLAGS_region, ResolveBucketRegion(FLAGS_bucket));
+  }
+
   if (FLAGS_clear) {
     ClearBucket(argc, argv);
   } else if (FLAGS_test) {
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index ac9fb31..4f3083f 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -38,6 +38,7 @@
 // cpp/cmake_modules/BuildUtils.cmake for details.
 #include <boost/process.hpp>
 
+#include <gmock/gmock-matchers.h>
 #include <gtest/gtest.h>
 
 #ifdef _WIN32
@@ -113,6 +114,17 @@ namespace bp = boost::process;
   ARROW_AWS_ASSIGN_OR_FAIL_IMPL(             \
       ARROW_AWS_ASSIGN_OR_FAIL_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
 
+class AwsTestMixin : public ::testing::Test {
+ public:
+  void SetUp() {
+    // we set this environment variable to speed up tests by ensuring
+    // DefaultAWSCredentialsProviderChain does not query (inaccessible)
+    // EC2 metadata endpoint
+    ASSERT_OK(SetEnvVar("AWS_EC2_METADATA_DISABLED", "true"));
+  }
+  void TearDown() { ASSERT_OK(DelEnvVar("AWS_EC2_METADATA_DISABLED")); }
+};
+
 class S3TestMixin : public ::testing::Test {
  public:
   void SetUp() override {
@@ -164,16 +176,7 @@ void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket,
 ////////////////////////////////////////////////////////////////////////////
 // S3Options tests
 
-class S3OptionsTest : public ::testing::Test {
- public:
-  void SetUp() {
-    // we set this environment variable to speed up tests by ensuring
-    // DefaultAWSCredentialsProviderChain does not query (inaccessible)
-    // EC2 metadata endpoint
-    ASSERT_OK(SetEnvVar("AWS_EC2_METADATA_DISABLED", "true"));
-  }
-  void TearDown() { ASSERT_OK(DelEnvVar("AWS_EC2_METADATA_DISABLED")); }
-};
+class S3OptionsTest : public AwsTestMixin {};
 
 TEST_F(S3OptionsTest, FromUri) {
   std::string path;
@@ -255,6 +258,34 @@ TEST_F(S3OptionsTest, FromAssumeRole) {
 }
 
 ////////////////////////////////////////////////////////////////////////////
+// Region resolution test
+
+class S3RegionResolutionTest : public AwsTestMixin {};
+
+TEST_F(S3RegionResolutionTest, PublicBucket) {
+  ASSERT_OK_AND_EQ("us-east-2", ResolveBucketRegion("ursa-labs-taxi-data"));
+
+  // Taken from a registry of open S3-hosted datasets
+  // at https://github.com/awslabs/open-data-registry
+  ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
+  // Same again, cached
+  ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
+}
+
+TEST_F(S3RegionResolutionTest, RestrictedBucket) {
+  ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test"));
+  // Same again, cached
+  ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test"));
+}
+
+TEST_F(S3RegionResolutionTest, NonExistentBucket) {
+  auto maybe_region = ResolveBucketRegion("ursa-labs-non-existent-bucket");
+  ASSERT_RAISES(IOError, maybe_region);
+  ASSERT_THAT(maybe_region.status().message(),
+              ::testing::HasSubstr("Bucket 'ursa-labs-non-existent-bucket' not found"));
+}
+
+////////////////////////////////////////////////////////////////////////////
 // Basic test for the Minio test server.
 
 class TestMinioServer : public S3TestMixin {
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 0c9e74e..f95edb0 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -22,7 +22,7 @@
 #include <type_traits>
 #include <vector>
 
-#include "arrow/type_fwd.h"
+#include "arrow/type.h"
 #include "arrow/util/bit_util.h"
 
 namespace arrow {
diff --git a/cpp/src/arrow/util/atomic_shared_ptr.h b/cpp/src/arrow/util/atomic_shared_ptr.h
index 08f12b3..d93ad92 100644
--- a/cpp/src/arrow/util/atomic_shared_ptr.h
+++ b/cpp/src/arrow/util/atomic_shared_ptr.h
@@ -47,13 +47,13 @@ using enable_if_atomic_load_shared_ptr_unavailable =
     enable_if_t<!is_atomic_load_shared_ptr_available<T>::value, T>;
 
 template <class T>
-inline enable_if_atomic_load_shared_ptr_available<std::shared_ptr<T>> atomic_load(
+enable_if_atomic_load_shared_ptr_available<std::shared_ptr<T>> atomic_load(
     const std::shared_ptr<T>* p) {
   return std::atomic_load(p);
 }
 
 template <class T>
-inline enable_if_atomic_load_shared_ptr_unavailable<std::shared_ptr<T>> atomic_load(
+enable_if_atomic_load_shared_ptr_unavailable<std::shared_ptr<T>> atomic_load(
     const std::shared_ptr<T>* p) {
   return *p;
 }
@@ -76,18 +76,36 @@ using enable_if_atomic_store_shared_ptr_unavailable =
     enable_if_t<!is_atomic_store_shared_ptr_available<T>::value, T>;
 
 template <class T>
-inline void atomic_store(
-    enable_if_atomic_store_shared_ptr_available<std::shared_ptr<T>*> p,
-    std::shared_ptr<T> r) {
+void atomic_store(enable_if_atomic_store_shared_ptr_available<std::shared_ptr<T>*> p,
+                  std::shared_ptr<T> r) {
   std::atomic_store(p, std::move(r));
 }
 
 template <class T>
-inline void atomic_store(
-    enable_if_atomic_store_shared_ptr_unavailable<std::shared_ptr<T>*> p,
-    std::shared_ptr<T> r) {
+void atomic_store(enable_if_atomic_store_shared_ptr_unavailable<std::shared_ptr<T>*> p,
+                  std::shared_ptr<T> r) {
   *p = r;
 }
 
+template <class T>
+bool atomic_compare_exchange_strong(
+    enable_if_atomic_store_shared_ptr_available<std::shared_ptr<T>*> p,
+    std::shared_ptr<T>* expected, std::shared_ptr<T> desired) {
+  return std::atomic_compare_exchange_strong(p, expected, std::move(desired));
+}
+
+template <class T>
+bool atomic_compare_exchange_strong(
+    enable_if_atomic_store_shared_ptr_unavailable<std::shared_ptr<T>*> p,
+    std::shared_ptr<T>* expected, std::shared_ptr<T> desired) {
+  if (*p == *expected) {
+    *p = std::move(desired);
+    return true;
+  } else {
+    *expected = *p;
+    return false;
+  }
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx
index 6720dff..81daad5 100644
--- a/python/pyarrow/_s3fs.pyx
+++ b/python/pyarrow/_s3fs.pyx
@@ -212,3 +212,10 @@ cdef class S3FileSystem(FileSystem):
                 background_writes=opts.background_writes
             ),)
         )
+
+    @property
+    def region(self):
+        """
+        The AWS region this filesystem connects to.
+        """
+        return frombytes(self.s3fs.options().region)
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index ba2ae8a..cb4f4e8 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -995,9 +995,10 @@ def test_s3_options(monkeypatch):
     monkeypatch.setenv("AWS_EC2_METADATA_DISABLED", "true")
 
     fs = S3FileSystem(access_key='access', secret_key='secret',
-                      session_token='token', region='us-east-1',
+                      session_token='token', region='us-east-2',
                       scheme='https', endpoint_override='localhost:8999')
     assert isinstance(fs, S3FileSystem)
+    assert fs.region == 'us-east-2'
     assert pickle.loads(pickle.dumps(fs)) == fs
 
     fs = S3FileSystem(role_arn='role', session_name='session',
@@ -1319,5 +1320,29 @@ def test_s3_real_aws():
     # This is a minimal integration check for ARROW-9261 and similar issues.
     from pyarrow.fs import S3FileSystem
     fs = S3FileSystem(anonymous=True)
+    assert fs.region == 'us-east-1'  # default region
     entries = fs.get_file_info(FileSelector('ursa-labs-taxi-data'))
     assert len(entries) > 0
+
+
+@pytest.mark.s3
+def test_s3_real_aws_region_selection():
+    # Taken from a registry of open S3-hosted datasets
+    # at https://github.com/awslabs/open-data-registry
+    fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt')
+    assert fs.region == 'eu-west-1'
+    with fs.open_input_stream(path) as f:
+        assert b"Meteo-France Atmospheric models on AWS" in f.read(50)
+
+    # Passing an explicit region disables auto-selection
+    fs, path = FileSystem.from_uri(
+        's3://mf-nwp-models/README.txt?region=us-east-2')
+    assert fs.region == 'us-east-2'
+    # Reading from the wrong region may still work for public buckets...
+
+    # Non-existent bucket (hopefully, otherwise need to fix this test)
+    with pytest.raises(IOError, match="Bucket '.*' not found"):
+        FileSystem.from_uri('s3://x-arrow-non-existent-bucket')
+    fs, path = FileSystem.from_uri(
+        's3://x-arrow-non-existent-bucket?region=us-east-3')
+    assert fs.region == 'us-east-3'