You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/09/21 18:14:09 UTC
[arrow] branch master updated: ARROW-9775: [C++] Automatic S3
region selection
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 248803c ARROW-9775: [C++] Automatic S3 region selection
248803c is described below
commit 248803c32c4407c60e3dabce28539348291e3d7a
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Sep 21 20:13:40 2020 +0200
ARROW-9775: [C++] Automatic S3 region selection
Once looked up, the region of a given bucket is cached for the lifetime of the process.
(the cache size is unbounded, which shouldn't be a problem in practice)
S3Options::FromUri() now resolves the region for the bucket if it wasn't given as a Uri query parameter.
Closes #8205 from pitrou/ARROW-9775-s3-auto-region
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/filesystem/s3fs.cc | 219 ++++++++++++++++++++----
cpp/src/arrow/filesystem/s3fs.h | 7 +
cpp/src/arrow/filesystem/s3fs_narrative_test.cc | 6 +-
cpp/src/arrow/filesystem/s3fs_test.cc | 51 ++++--
cpp/src/arrow/type_traits.h | 2 +-
cpp/src/arrow/util/atomic_shared_ptr.h | 34 +++-
python/pyarrow/_s3fs.pyx | 7 +
python/pyarrow/tests/test_fs.py | 27 ++-
8 files changed, 299 insertions(+), 54 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 403f278..ebc2ea2 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -36,9 +36,11 @@
#endif
#include <aws/core/Aws.h>
+#include <aws/core/Region.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/RetryStrategy.h>
+#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
@@ -71,6 +73,7 @@
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/util/atomic_shared_ptr.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/windows_fixup.h"
@@ -161,6 +164,9 @@ Status EnsureS3Initialized() {
return Status::OK();
}
+// -----------------------------------------------------------------------
+// S3Options implementation
+
void S3Options::ConfigureDefaultCredentials() {
credentials_provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
@@ -271,9 +277,11 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
options.ConfigureDefaultCredentials();
}
+ bool region_set = false;
for (const auto& kv : options_map) {
if (kv.first == "region") {
options.region = kv.second;
+ region_set = true;
} else if (kv.first == "scheme") {
options.scheme = kv.second;
} else if (kv.first == "endpoint_override") {
@@ -283,6 +291,11 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
}
}
+ if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
+ // XXX Should we use a dedicated resolver with the given credentials?
+ ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
+ }
+
return options;
}
@@ -403,6 +416,161 @@ std::string FormatRange(int64_t start, int64_t length) {
return ss.str();
}
+class S3Client : public Aws::S3::S3Client {
+ public:
+ using Aws::S3::S3Client::S3Client;
+
+ // To get a bucket's region, we must extract the "x-amz-bucket-region" header
+ // from the response to a HEAD bucket request.
+ // Unfortunately, the S3Client APIs don't let us access the headers of successful
+ // responses. So we have to cook a AWS request and issue it ourselves.
+
+ Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
+ auto uri = GeneratePresignedUrl(request.GetBucket(),
+ /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
+ // NOTE: The signer region argument isn't passed here, as there's no easy
+ // way of computing it (the relevant method is private).
+ auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
+ Aws::Auth::SIGV4_SIGNER);
+ const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
+ : outcome.GetError().GetResponseCode();
+ const auto& headers = outcome.IsSuccess()
+ ? outcome.GetResult().GetHeaderValueCollection()
+ : outcome.GetError().GetResponseHeaders();
+
+ const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
+ if (it == headers.end()) {
+ if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
+ return Status::IOError("Bucket '", request.GetBucket(), "' not found");
+ } else if (!outcome.IsSuccess()) {
+ return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
+ request.GetBucket(), "': "),
+ outcome.GetError());
+ } else {
+ return Status::IOError("When resolving region for bucket '", request.GetBucket(),
+ "': missing 'x-amz-bucket-region' header in response");
+ }
+ }
+ return std::string(FromAwsString(it->second));
+ }
+
+ Result<std::string> GetBucketRegion(const std::string& bucket) {
+ S3Model::HeadBucketRequest req;
+ req.SetBucket(ToAwsString(bucket));
+ return GetBucketRegion(req);
+ }
+};
+
+class ClientBuilder {
+ public:
+ explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
+
+ Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
+
+ Result<std::unique_ptr<S3Client>> BuildClient() {
+ credentials_provider_ = options_.credentials_provider;
+ client_config_.region = ToAwsString(options_.region);
+ client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
+ if (options_.scheme == "http") {
+ client_config_.scheme = Aws::Http::Scheme::HTTP;
+ } else if (options_.scheme == "https") {
+ client_config_.scheme = Aws::Http::Scheme::HTTPS;
+ } else {
+ return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
+ }
+ client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
+ if (!internal::global_options.tls_ca_file_path.empty()) {
+ client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
+ }
+ if (!internal::global_options.tls_ca_dir_path.empty()) {
+ client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
+ }
+
+ const bool use_virtual_addressing = options_.endpoint_override.empty();
+ return std::unique_ptr<S3Client>(
+ new S3Client(credentials_provider_, client_config_,
+ Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+ use_virtual_addressing));
+ }
+
+ const S3Options& options() const { return options_; }
+
+ protected:
+ S3Options options_;
+ Aws::Client::ClientConfiguration client_config_;
+ std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
+};
+
+// -----------------------------------------------------------------------
+// S3 region resolver
+
+class RegionResolver {
+ public:
+ static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
+ std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
+ RETURN_NOT_OK(resolver->Init());
+ return resolver;
+ }
+
+ static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
+ static std::shared_ptr<RegionResolver> instance;
+ auto resolver = arrow::internal::atomic_load(&instance);
+ if (resolver) {
+ return resolver;
+ }
+ auto maybe_resolver = Make(S3Options::Anonymous());
+ if (!maybe_resolver.ok()) {
+ return maybe_resolver;
+ }
+ // Make sure to always return the same instance even if several threads
+ // call DefaultInstance at once.
+ std::shared_ptr<RegionResolver> existing;
+ if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
+ *maybe_resolver)) {
+ return *maybe_resolver;
+ } else {
+ return existing;
+ }
+ }
+
+ Result<std::string> ResolveRegion(const std::string& bucket) {
+ std::unique_lock<std::mutex> lock(cache_mutex_);
+ auto it = cache_.find(bucket);
+ if (it != cache_.end()) {
+ return it->second;
+ }
+ lock.unlock();
+ ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
+ lock.lock();
+ // Note we don't cache a non-existent bucket, as the bucket could be created later
+ cache_[bucket] = region;
+ return region;
+ }
+
+ Result<std::string> ResolveRegionUncached(const std::string& bucket) {
+ return client_->GetBucketRegion(bucket);
+ }
+
+ protected:
+ explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
+
+ Status Init() {
+ DCHECK(builder_.options().endpoint_override.empty());
+ return builder_.BuildClient().Value(&client_);
+ }
+
+ ClientBuilder builder_;
+ std::unique_ptr<S3Client> client_;
+
+ std::mutex cache_mutex_;
+ // XXX Should cache size be bounded? It must be quite unusual to query millions
+ // of different buckets in a single program invocation...
+ std::unordered_map<std::string, std::string> cache_;
+};
+
+// -----------------------------------------------------------------------
+// S3 file stream implementations
+
// A non-copying iostream.
// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
@@ -894,11 +1062,12 @@ void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
} // namespace
+// -----------------------------------------------------------------------
+// S3 filesystem implementation
+
class S3FileSystem::Impl {
public:
- S3Options options_;
- Aws::Client::ClientConfiguration client_config_;
- std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
+ ClientBuilder builder_;
std::unique_ptr<Aws::S3::S3Client> client_;
const int32_t kListObjectsMaxKeys = 1000;
@@ -907,36 +1076,11 @@ class S3FileSystem::Impl {
// Limit recursing depth, since a recursion bomb can be created
const int32_t kMaxNestingDepth = 100;
- explicit Impl(S3Options options) : options_(std::move(options)) {}
+ explicit Impl(S3Options options) : builder_(std::move(options)) {}
- Status Init() {
- credentials_provider_ = options_.credentials_provider;
- client_config_.region = ToAwsString(options_.region);
- client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
- if (options_.scheme == "http") {
- client_config_.scheme = Aws::Http::Scheme::HTTP;
- } else if (options_.scheme == "https") {
- client_config_.scheme = Aws::Http::Scheme::HTTPS;
- } else {
- return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
- }
- client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
- if (!internal::global_options.tls_ca_file_path.empty()) {
- client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
- }
- if (!internal::global_options.tls_ca_dir_path.empty()) {
- client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
- }
-
- bool use_virtual_addressing = options_.endpoint_override.empty();
- client_.reset(
- new Aws::S3::S3Client(credentials_provider_, client_config_,
- Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
- use_virtual_addressing));
- return Status::OK();
- }
+ Status Init() { return builder_.BuildClient().Value(&client_); }
- S3Options options() const { return options_; }
+ const S3Options& options() const { return builder_.options(); }
// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
@@ -944,7 +1088,7 @@ class S3FileSystem::Impl {
S3Model::CreateBucketRequest req;
config.SetLocationConstraint(
S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
- ToAwsString(options_.region)));
+ ToAwsString(options().region)));
req.SetBucket(ToAwsString(bucket));
req.SetCreateBucketConfiguration(config);
@@ -1586,7 +1730,7 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
RETURN_NOT_OK(ValidateFilePath(path));
auto ptr = std::make_shared<ObjectOutputStream>(
- shared_from_this(), impl_->client_.get(), path, impl_->options_);
+ shared_from_this(), impl_->client_.get(), path, impl_->options());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
@@ -1599,5 +1743,14 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
}
+//
+// Top-level utility functions
+//
+
+Result<std::string> ResolveBucketRegion(const std::string& bucket) {
+ ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
+ return resolver->ResolveRegion(bucket);
+}
+
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index aa39c30..868ef44 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -95,10 +95,12 @@ struct ARROW_EXPORT S3Options {
/// This is recommended if you use the standard AWS environment variables
/// and/or configuration file.
static S3Options Defaults();
+
/// \brief Initialize with anonymous credentials.
///
/// This will only let you access public buckets.
static S3Options Anonymous();
+
/// \brief Initialize with explicit access and secret key.
///
/// Optionally, a session token may also be provided for temporary credentials
@@ -106,11 +108,13 @@ struct ARROW_EXPORT S3Options {
static S3Options FromAccessKey(const std::string& access_key,
const std::string& secret_key,
const std::string& session_token = "");
+
/// \brief Initialize from an assumed role.
static S3Options FromAssumeRole(
const std::string& role_arn, const std::string& session_name = "",
const std::string& external_id = "", int load_frequency = 900,
const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
+
static Result<S3Options> FromUri(const ::arrow::internal::Uri& uri,
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
@@ -216,5 +220,8 @@ Status EnsureS3Initialized();
ARROW_EXPORT
Status FinalizeS3();
+ARROW_EXPORT
+Result<std::string> ResolveBucketRegion(const std::string& bucket);
+
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
index 548b046..0beb51d 100644
--- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc
@@ -43,7 +43,7 @@ DEFINE_string(access_key, "", "S3 access key");
DEFINE_string(secret_key, "", "S3 secret key");
DEFINE_string(bucket, "", "bucket name");
-DEFINE_string(region, arrow::fs::kS3DefaultRegion, "AWS region");
+DEFINE_string(region, "", "AWS region");
DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')");
DEFINE_string(scheme, "https", "Connection scheme");
@@ -201,6 +201,10 @@ void TestMain(int argc, char** argv) {
: (FLAGS_verbose ? S3LogLevel::Warn : S3LogLevel::Fatal);
ASSERT_OK(InitializeS3(options));
+ if (FLAGS_region.empty()) {
+ ASSERT_OK_AND_ASSIGN(FLAGS_region, ResolveBucketRegion(FLAGS_bucket));
+ }
+
if (FLAGS_clear) {
ClearBucket(argc, argv);
} else if (FLAGS_test) {
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index ac9fb31..4f3083f 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -38,6 +38,7 @@
// cpp/cmake_modules/BuildUtils.cmake for details.
#include <boost/process.hpp>
+#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#ifdef _WIN32
@@ -113,6 +114,17 @@ namespace bp = boost::process;
ARROW_AWS_ASSIGN_OR_FAIL_IMPL( \
ARROW_AWS_ASSIGN_OR_FAIL_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
+class AwsTestMixin : public ::testing::Test {
+ public:
+ void SetUp() {
+ // we set this environment variable to speed up tests by ensuring
+ // DefaultAWSCredentialsProviderChain does not query (inaccessible)
+ // EC2 metadata endpoint
+ ASSERT_OK(SetEnvVar("AWS_EC2_METADATA_DISABLED", "true"));
+ }
+ void TearDown() { ASSERT_OK(DelEnvVar("AWS_EC2_METADATA_DISABLED")); }
+};
+
class S3TestMixin : public ::testing::Test {
public:
void SetUp() override {
@@ -164,16 +176,7 @@ void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket,
////////////////////////////////////////////////////////////////////////////
// S3Options tests
-class S3OptionsTest : public ::testing::Test {
- public:
- void SetUp() {
- // we set this environment variable to speed up tests by ensuring
- // DefaultAWSCredentialsProviderChain does not query (inaccessible)
- // EC2 metadata endpoint
- ASSERT_OK(SetEnvVar("AWS_EC2_METADATA_DISABLED", "true"));
- }
- void TearDown() { ASSERT_OK(DelEnvVar("AWS_EC2_METADATA_DISABLED")); }
-};
+class S3OptionsTest : public AwsTestMixin {};
TEST_F(S3OptionsTest, FromUri) {
std::string path;
@@ -255,6 +258,34 @@ TEST_F(S3OptionsTest, FromAssumeRole) {
}
////////////////////////////////////////////////////////////////////////////
+// Region resolution test
+
+class S3RegionResolutionTest : public AwsTestMixin {};
+
+TEST_F(S3RegionResolutionTest, PublicBucket) {
+ ASSERT_OK_AND_EQ("us-east-2", ResolveBucketRegion("ursa-labs-taxi-data"));
+
+ // Taken from a registry of open S3-hosted datasets
+ // at https://github.com/awslabs/open-data-registry
+ ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
+ // Same again, cached
+ ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
+}
+
+TEST_F(S3RegionResolutionTest, RestrictedBucket) {
+ ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test"));
+ // Same again, cached
+ ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test"));
+}
+
+TEST_F(S3RegionResolutionTest, NonExistentBucket) {
+ auto maybe_region = ResolveBucketRegion("ursa-labs-non-existent-bucket");
+ ASSERT_RAISES(IOError, maybe_region);
+ ASSERT_THAT(maybe_region.status().message(),
+ ::testing::HasSubstr("Bucket 'ursa-labs-non-existent-bucket' not found"));
+}
+
+////////////////////////////////////////////////////////////////////////////
// Basic test for the Minio test server.
class TestMinioServer : public S3TestMixin {
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 0c9e74e..f95edb0 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -22,7 +22,7 @@
#include <type_traits>
#include <vector>
-#include "arrow/type_fwd.h"
+#include "arrow/type.h"
#include "arrow/util/bit_util.h"
namespace arrow {
diff --git a/cpp/src/arrow/util/atomic_shared_ptr.h b/cpp/src/arrow/util/atomic_shared_ptr.h
index 08f12b3..d93ad92 100644
--- a/cpp/src/arrow/util/atomic_shared_ptr.h
+++ b/cpp/src/arrow/util/atomic_shared_ptr.h
@@ -47,13 +47,13 @@ using enable_if_atomic_load_shared_ptr_unavailable =
enable_if_t<!is_atomic_load_shared_ptr_available<T>::value, T>;
template <class T>
-inline enable_if_atomic_load_shared_ptr_available<std::shared_ptr<T>> atomic_load(
+enable_if_atomic_load_shared_ptr_available<std::shared_ptr<T>> atomic_load(
const std::shared_ptr<T>* p) {
return std::atomic_load(p);
}
template <class T>
-inline enable_if_atomic_load_shared_ptr_unavailable<std::shared_ptr<T>> atomic_load(
+enable_if_atomic_load_shared_ptr_unavailable<std::shared_ptr<T>> atomic_load(
const std::shared_ptr<T>* p) {
return *p;
}
@@ -76,18 +76,36 @@ using enable_if_atomic_store_shared_ptr_unavailable =
enable_if_t<!is_atomic_store_shared_ptr_available<T>::value, T>;
template <class T>
-inline void atomic_store(
- enable_if_atomic_store_shared_ptr_available<std::shared_ptr<T>*> p,
- std::shared_ptr<T> r) {
+void atomic_store(enable_if_atomic_store_shared_ptr_available<std::shared_ptr<T>*> p,
+ std::shared_ptr<T> r) {
std::atomic_store(p, std::move(r));
}
template <class T>
-inline void atomic_store(
- enable_if_atomic_store_shared_ptr_unavailable<std::shared_ptr<T>*> p,
- std::shared_ptr<T> r) {
+void atomic_store(enable_if_atomic_store_shared_ptr_unavailable<std::shared_ptr<T>*> p,
+ std::shared_ptr<T> r) {
*p = r;
}
+template <class T>
+bool atomic_compare_exchange_strong(
+ enable_if_atomic_store_shared_ptr_available<std::shared_ptr<T>*> p,
+ std::shared_ptr<T>* expected, std::shared_ptr<T> desired) {
+ return std::atomic_compare_exchange_strong(p, expected, std::move(desired));
+}
+
+template <class T>
+bool atomic_compare_exchange_strong(
+ enable_if_atomic_store_shared_ptr_unavailable<std::shared_ptr<T>*> p,
+ std::shared_ptr<T>* expected, std::shared_ptr<T> desired) {
+ if (*p == *expected) {
+ *p = std::move(desired);
+ return true;
+ } else {
+ *expected = *p;
+ return false;
+ }
+}
+
} // namespace internal
} // namespace arrow
diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx
index 6720dff..81daad5 100644
--- a/python/pyarrow/_s3fs.pyx
+++ b/python/pyarrow/_s3fs.pyx
@@ -212,3 +212,10 @@ cdef class S3FileSystem(FileSystem):
background_writes=opts.background_writes
),)
)
+
+ @property
+ def region(self):
+ """
+ The AWS region this filesystem connects to.
+ """
+ return frombytes(self.s3fs.options().region)
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index ba2ae8a..cb4f4e8 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -995,9 +995,10 @@ def test_s3_options(monkeypatch):
monkeypatch.setenv("AWS_EC2_METADATA_DISABLED", "true")
fs = S3FileSystem(access_key='access', secret_key='secret',
- session_token='token', region='us-east-1',
+ session_token='token', region='us-east-2',
scheme='https', endpoint_override='localhost:8999')
assert isinstance(fs, S3FileSystem)
+ assert fs.region == 'us-east-2'
assert pickle.loads(pickle.dumps(fs)) == fs
fs = S3FileSystem(role_arn='role', session_name='session',
@@ -1319,5 +1320,29 @@ def test_s3_real_aws():
# This is a minimal integration check for ARROW-9261 and similar issues.
from pyarrow.fs import S3FileSystem
fs = S3FileSystem(anonymous=True)
+ assert fs.region == 'us-east-1' # default region
entries = fs.get_file_info(FileSelector('ursa-labs-taxi-data'))
assert len(entries) > 0
+
+
+@pytest.mark.s3
+def test_s3_real_aws_region_selection():
+ # Taken from a registry of open S3-hosted datasets
+ # at https://github.com/awslabs/open-data-registry
+ fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt')
+ assert fs.region == 'eu-west-1'
+ with fs.open_input_stream(path) as f:
+ assert b"Meteo-France Atmospheric models on AWS" in f.read(50)
+
+ # Passing an explicit region disables auto-selection
+ fs, path = FileSystem.from_uri(
+ 's3://mf-nwp-models/README.txt?region=us-east-2')
+ assert fs.region == 'us-east-2'
+ # Reading from the wrong region may still work for public buckets...
+
+ # Non-existent bucket (hopefully, otherwise need to fix this test)
+ with pytest.raises(IOError, match="Bucket '.*' not found"):
+ FileSystem.from_uri('s3://x-arrow-non-existent-bucket')
+ fs, path = FileSystem.from_uri(
+ 's3://x-arrow-non-existent-bucket?region=us-east-3')
+ assert fs.region == 'us-east-3'