You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2023/06/22 04:23:49 UTC

[arrow] branch main updated: GH-36227: [C++] New GcsOption to set the project id (#36228)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ec413b7763 GH-36227: [C++] New GcsOption to set the project id (#36228)
ec413b7763 is described below

commit ec413b7763fec3694369b2ab101863007f338c3d
Author: Carlos O'Ryan <co...@google.com>
AuthorDate: Thu Jun 22 00:23:43 2023 -0400

    GH-36227: [C++] New GcsOption to set the project id (#36228)
    
    ### Rationale for this change
    
    This fixes #36227, originally motivated by the problems in #36119, but seems like a valuable feature in any case.
    
    ### What changes are included in this PR?
    
    - Refactor some code to make it testable.
    - Add a new `std::optional<std::string>` field to the `GcsOptions` class.
    
    ### Are these changes tested?
    
    Yes, I expanded the unit tests.
    
    ### Are there any user-facing changes?
    
    Yes. I updated the field documentation.  If I missed some documentation please let me know.
    
    I am also not familiar with the steps required to update the Python wrappers, if there is some documentation to follow I would appreciate it.  I can expand this PR or send a separate one, your call.
    
    * Closes: #36227
    
    Authored-by: Carlos O'Ryan <co...@google.com>
    Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
 cpp/src/arrow/filesystem/gcsfs.cc          | 63 ++++++------------------------
 cpp/src/arrow/filesystem/gcsfs.h           | 17 +++++++-
 cpp/src/arrow/filesystem/gcsfs_internal.cc | 40 +++++++++++++++++++
 cpp/src/arrow/filesystem/gcsfs_internal.h  | 13 ++++++
 cpp/src/arrow/filesystem/gcsfs_test.cc     | 34 +++++++++++++++-
 5 files changed, 113 insertions(+), 54 deletions(-)

diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc
index 6fc75589b0..d41cb49022 100644
--- a/cpp/src/arrow/filesystem/gcsfs.cc
+++ b/cpp/src/arrow/filesystem/gcsfs.cc
@@ -35,12 +35,6 @@
 
 namespace arrow {
 namespace fs {
-struct GcsCredentialsHolder {
-  // Constructor needed for make_shared
-  explicit GcsCredentialsHolder(std::shared_ptr<google::cloud::Credentials> credentials)
-      : credentials(std::move(credentials)) {}
-  std::shared_ptr<google::cloud::Credentials> credentials;
-};
 
 bool GcsCredentials::Equals(const GcsCredentials& other) const {
   if (holder_->credentials == other.holder_->credentials) {
@@ -58,14 +52,6 @@ namespace gcs = google::cloud::storage;
 using GcsCode = google::cloud::StatusCode;
 using GcsStatus = google::cloud::Status;
 
-// Change the default upload buffer size. In general, sending larger buffers is more
-// efficient with GCS, as each buffer requires a roundtrip to the service. With formatted
-// output (when using `operator<<`), keeping a larger buffer in memory before uploading
-// makes sense.  With unformatted output (the only choice given gcs::io::OutputStream's
-// API) it is better to let the caller provide as large a buffer as they want. The GCS C++
-// client library will upload this buffer with zero copies if possible.
-auto constexpr kUploadBufferSize = 256 * 1024;
-
 struct GcsPath {
   std::string full_path;
   std::string bucket;
@@ -334,40 +320,12 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
   std::shared_ptr<GcsInputStream> mutable stream_;
 };
 
-google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
-  auto options = google::cloud::Options{};
-  std::string scheme = o.scheme;
-  if (scheme.empty()) scheme = "https";
-  if (scheme == "https") {
-    options.set<google::cloud::UnifiedCredentialsOption>(
-        google::cloud::MakeGoogleDefaultCredentials());
-  } else {
-    options.set<google::cloud::UnifiedCredentialsOption>(
-        google::cloud::MakeInsecureCredentials());
-  }
-  options.set<gcs::UploadBufferSizeOption>(kUploadBufferSize);
-  if (!o.endpoint_override.empty()) {
-    options.set<gcs::RestEndpointOption>(scheme + "://" + o.endpoint_override);
-  }
-  if (o.credentials.holder() && o.credentials.holder()->credentials) {
-    options.set<google::cloud::UnifiedCredentialsOption>(
-        o.credentials.holder()->credentials);
-  }
-  if (o.retry_limit_seconds.has_value()) {
-    options.set<gcs::RetryPolicyOption>(
-        gcs::LimitedTimeRetryPolicy(
-            std::chrono::milliseconds(static_cast<int>(*o.retry_limit_seconds * 1000)))
-            .clone());
-  }
-  return options;
-}
-
 }  // namespace
 
 class GcsFileSystem::Impl {
  public:
   explicit Impl(GcsOptions o)
-      : options_(std::move(o)), client_(AsGoogleCloudOptions(options_)) {}
+      : options_(std::move(o)), client_(internal::AsGoogleCloudOptions(options_)) {}
 
   const GcsOptions& options() const { return options_; }
 
@@ -731,7 +689,7 @@ class GcsFileSystem::Impl {
 };
 
 GcsOptions::GcsOptions() {
-  this->credentials.holder_ = std::make_shared<GcsCredentialsHolder>(
+  this->credentials.holder_ = std::make_shared<internal::GcsCredentialsHolder>(
       google::cloud::MakeGoogleDefaultCredentials());
   this->scheme = "https";
 }
@@ -740,7 +698,8 @@ bool GcsOptions::Equals(const GcsOptions& other) const {
   return credentials.Equals(other.credentials) &&
          endpoint_override == other.endpoint_override && scheme == other.scheme &&
          default_bucket_location == other.default_bucket_location &&
-         retry_limit_seconds == other.retry_limit_seconds;
+         retry_limit_seconds == other.retry_limit_seconds &&
+         project_id == other.project_id;
 }
 
 GcsOptions GcsOptions::Defaults() {
@@ -750,8 +709,8 @@ GcsOptions GcsOptions::Defaults() {
 
 GcsOptions GcsOptions::Anonymous() {
   GcsOptions options{};
-  options.credentials.holder_ =
-      std::make_shared<GcsCredentialsHolder>(google::cloud::MakeInsecureCredentials());
+  options.credentials.holder_ = std::make_shared<internal::GcsCredentialsHolder>(
+      google::cloud::MakeInsecureCredentials());
   options.credentials.anonymous_ = true;
   options.scheme = "http";
   return options;
@@ -760,8 +719,8 @@ GcsOptions GcsOptions::Anonymous() {
 GcsOptions GcsOptions::FromAccessToken(const std::string& access_token,
                                        TimePoint expiration) {
   GcsOptions options{};
-  options.credentials.holder_ =
-      std::make_shared<GcsCredentialsHolder>(google::cloud::MakeAccessTokenCredentials(
+  options.credentials.holder_ = std::make_shared<internal::GcsCredentialsHolder>(
+      google::cloud::MakeAccessTokenCredentials(
           access_token,
           std::chrono::time_point_cast<std::chrono::system_clock::time_point::duration>(
               expiration)));
@@ -775,7 +734,7 @@ GcsOptions GcsOptions::FromImpersonatedServiceAccount(
     const GcsCredentials& base_credentials, const std::string& target_service_account) {
   GcsOptions options{};
   options.credentials = base_credentials;
-  options.credentials.holder_ = std::make_shared<GcsCredentialsHolder>(
+  options.credentials.holder_ = std::make_shared<internal::GcsCredentialsHolder>(
       google::cloud::MakeImpersonateServiceAccountCredentials(
           base_credentials.holder_->credentials, target_service_account));
   options.credentials.target_service_account_ = target_service_account;
@@ -785,7 +744,7 @@ GcsOptions GcsOptions::FromImpersonatedServiceAccount(
 
 GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_object) {
   GcsOptions options{};
-  options.credentials.holder_ = std::make_shared<GcsCredentialsHolder>(
+  options.credentials.holder_ = std::make_shared<internal::GcsCredentialsHolder>(
       google::cloud::MakeServiceAccountCredentials(json_object));
   options.credentials.json_credentials_ = json_object;
   options.scheme = "https";
@@ -844,6 +803,8 @@ Result<GcsOptions> GcsOptions::FromUri(const arrow::internal::Uri& uri,
                                kv.second, "'");
       }
       options.retry_limit_seconds = parsed_seconds;
+    } else if (kv.first == "project_id") {
+      options.project_id = kv.second;
     } else {
       return Status::Invalid("Unexpected query parameter in GCS URI: '", kv.first, "'");
     }
diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h
index d4b919ec81..e4a1edfd6b 100644
--- a/cpp/src/arrow/filesystem/gcsfs.h
+++ b/cpp/src/arrow/filesystem/gcsfs.h
@@ -27,9 +27,13 @@
 
 namespace arrow {
 namespace fs {
+namespace internal {
 
 // Opaque wrapper for GCS's library credentials to avoid exposing in Arrow headers.
 struct GcsCredentialsHolder;
+
+}  // namespace internal
+
 class GcsFileSystem;
 
 /// \brief Container for GCS Credentials and information necessary to recreate them.
@@ -41,7 +45,9 @@ class ARROW_EXPORT GcsCredentials {
   TimePoint expiration() const { return expiration_; }
   const std::string& target_service_account() const { return target_service_account_; }
   const std::string& json_credentials() const { return json_credentials_; }
-  const std::shared_ptr<GcsCredentialsHolder>& holder() const { return holder_; }
+  const std::shared_ptr<internal::GcsCredentialsHolder>& holder() const {
+    return holder_;
+  }
 
  private:
   GcsCredentials() = default;
@@ -50,7 +56,7 @@ class ARROW_EXPORT GcsCredentials {
   TimePoint expiration_;
   std::string target_service_account_;
   std::string json_credentials_;
-  std::shared_ptr<GcsCredentialsHolder> holder_;
+  std::shared_ptr<internal::GcsCredentialsHolder> holder_;
   friend class GcsFileSystem;
   friend struct GcsOptions;
 };
@@ -77,6 +83,13 @@ struct ARROW_EXPORT GcsOptions {
   /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
   std::shared_ptr<const KeyValueMetadata> default_metadata;
 
+  /// \brief The project to use for creating buckets.
+  ///
+  /// If not set, the library uses the GOOGLE_CLOUD_PROJECT environment
+  /// variable. Most I/O operations do not need a project id, only applications
+  /// that create new buckets need a project id.
+  std::optional<std::string> project_id;
+
   bool Equals(const GcsOptions& other) const;
 
   /// \brief Initialize with Google Default Credentials
diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc b/cpp/src/arrow/filesystem/gcsfs_internal.cc
index c6b4052489..d155aec827 100644
--- a/cpp/src/arrow/filesystem/gcsfs_internal.cc
+++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "arrow/filesystem/gcsfs_internal.h"
+#include "arrow/filesystem/gcsfs.h"
 
 #include <absl/time/time.h>  // NOLINT
 #include <google/cloud/storage/client.h>
@@ -306,6 +307,45 @@ std::int64_t Depth(std::string_view path) {
   return std::count(path.begin(), path.end(), fs::internal::kSep) - has_trailing_slash;
 }
 
+// Change the default upload buffer size. In general, sending larger buffers is more
+// efficient with GCS, as each buffer requires a roundtrip to the service. With formatted
+// output (when using `operator<<`), keeping a larger buffer in memory before uploading
+// makes sense.  With unformatted output (the only choice given gcs::io::OutputStream's
+// API) it is better to let the caller provide as large a buffer as they want. The GCS C++
+// client library will upload this buffer with zero copies if possible.
+auto constexpr kUploadBufferSize = 256 * 1024;
+
+google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
+  auto options = google::cloud::Options{};
+  std::string scheme = o.scheme;
+  if (scheme.empty()) scheme = "https";
+  if (scheme == "https") {
+    options.set<google::cloud::UnifiedCredentialsOption>(
+        google::cloud::MakeGoogleDefaultCredentials());
+  } else {
+    options.set<google::cloud::UnifiedCredentialsOption>(
+        google::cloud::MakeInsecureCredentials());
+  }
+  options.set<gcs::UploadBufferSizeOption>(kUploadBufferSize);
+  if (!o.endpoint_override.empty()) {
+    options.set<gcs::RestEndpointOption>(scheme + "://" + o.endpoint_override);
+  }
+  if (o.credentials.holder() && o.credentials.holder()->credentials) {
+    options.set<google::cloud::UnifiedCredentialsOption>(
+        o.credentials.holder()->credentials);
+  }
+  if (o.retry_limit_seconds.has_value()) {
+    options.set<gcs::RetryPolicyOption>(
+        gcs::LimitedTimeRetryPolicy(
+            std::chrono::milliseconds(static_cast<int>(*o.retry_limit_seconds * 1000)))
+            .clone());
+  }
+  if (o.project_id.has_value()) {
+    options.set<gcs::ProjectIdOption>(*o.project_id);
+  }
+  return options;
+}
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.h b/cpp/src/arrow/filesystem/gcsfs_internal.h
index c2a0e2921d..a3aed099a3 100644
--- a/cpp/src/arrow/filesystem/gcsfs_internal.h
+++ b/cpp/src/arrow/filesystem/gcsfs_internal.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <google/cloud/credentials.h>
+#include <google/cloud/options.h>
 #include <google/cloud/status.h>
 #include <google/cloud/storage/object_metadata.h>
 #include <google/cloud/storage/well_known_headers.h>
@@ -30,8 +32,17 @@
 
 namespace arrow {
 namespace fs {
+struct GcsOptions;
+
 namespace internal {
 
+struct GcsCredentialsHolder {
+  // Constructor needed for make_shared
+  explicit GcsCredentialsHolder(std::shared_ptr<google::cloud::Credentials> credentials)
+      : credentials(std::move(credentials)) {}
+  std::shared_ptr<google::cloud::Credentials> credentials;
+};
+
 ARROW_EXPORT Status ToArrowStatus(const google::cloud::Status& s);
 
 ARROW_EXPORT int ErrnoFromStatus(const google::cloud::Status& s);
@@ -53,6 +64,8 @@ ARROW_EXPORT Result<std::shared_ptr<const KeyValueMetadata>> FromObjectMetadata(
 
 ARROW_EXPORT std::int64_t Depth(std::string_view path);
 
+ARROW_EXPORT google::cloud::Options AsGoogleCloudOptions(const GcsOptions& options);
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc
index 9d5136b47c..7a8afc0ee4 100644
--- a/cpp/src/arrow/filesystem/gcsfs_test.cc
+++ b/cpp/src/arrow/filesystem/gcsfs_test.cc
@@ -211,6 +211,7 @@ class GcsIntegrationTest : public ::testing::Test {
     auto options = GcsOptions::Anonymous();
     options.endpoint_override = "127.0.0.1:" + Testbench()->port();
     options.retry_limit_seconds = 60;
+    options.project_id = "test-only-invalid-project-id";
     return options;
   }
 
@@ -373,7 +374,8 @@ TEST(GcsFileSystem, OptionsFromUri) {
       options,
       GcsOptions::FromUri("gs://mybucket/foo/bar/"
                           "?endpoint_override=localhost&scheme=http&location=us-west2"
-                          "&retry_limit_seconds=40.5",
+                          "&retry_limit_seconds=40.5"
+                          "&project_id=test-project-id",
                           &path));
   EXPECT_EQ(options.default_bucket_location, "us-west2");
   EXPECT_EQ(options.scheme, "http");
@@ -381,6 +383,8 @@ TEST(GcsFileSystem, OptionsFromUri) {
   EXPECT_EQ(path, "mybucket/foo/bar");
   ASSERT_TRUE(options.retry_limit_seconds.has_value());
   EXPECT_EQ(*options.retry_limit_seconds, 40.5);
+  ASSERT_TRUE(options.project_id.has_value());
+  EXPECT_EQ(*options.project_id, "test-project-id");
 
   // Missing bucket name
   ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs:///foo/bar/", &path));
@@ -441,6 +445,32 @@ TEST(GcsFileSystem, OptionsServiceAccountCredentials) {
   EXPECT_EQ(a.scheme, "https");
 }
 
+TEST(GcsFileSystem, OptionsAsGoogleCloudOptions) {
+  auto a = GcsOptions::Anonymous();
+  a.scheme = "http";
+  a.endpoint_override = "localhost:8080";
+  a.default_bucket_location = "us-central1";
+  a.retry_limit_seconds = 40.5;
+  a.project_id = "test-only-invalid-project-id";
+
+  auto const o1 = internal::AsGoogleCloudOptions(a);
+  EXPECT_TRUE(o1.has<google::cloud::UnifiedCredentialsOption>());
+  EXPECT_TRUE(o1.has<gcs::RetryPolicyOption>());
+  EXPECT_EQ(o1.get<gcs::RestEndpointOption>(), "http://localhost:8080");
+  EXPECT_EQ(o1.get<gcs::ProjectIdOption>(), "test-only-invalid-project-id");
+
+  a.scheme.clear();
+  a.endpoint_override.clear();
+  a.retry_limit_seconds.reset();
+  a.project_id.reset();
+
+  auto const o2 = internal::AsGoogleCloudOptions(a);
+  EXPECT_TRUE(o2.has<google::cloud::UnifiedCredentialsOption>());
+  EXPECT_FALSE(o2.has<gcs::RetryPolicyOption>());
+  EXPECT_FALSE(o2.has<gcs::RestEndpointOption>());
+  EXPECT_FALSE(o2.has<gcs::ProjectIdOption>());
+}
+
 TEST(GcsFileSystem, ToArrowStatusOK) {
   Status actual = internal::ToArrowStatus(google::cloud::Status());
   EXPECT_TRUE(actual.ok());
@@ -486,6 +516,7 @@ TEST(GcsFileSystem, ToArrowStatus) {
 TEST(GcsFileSystem, FileSystemCompare) {
   GcsOptions a_options;
   a_options.scheme = "http";
+  a_options.project_id = "test-only-invalid-project-id";
   auto a = GcsFileSystem::Make(a_options);
   EXPECT_THAT(a, NotNull());
   EXPECT_TRUE(a->Equals(*a));
@@ -493,6 +524,7 @@ TEST(GcsFileSystem, FileSystemCompare) {
   GcsOptions b_options;
   b_options.scheme = "http";
   b_options.endpoint_override = "localhost:1234";
+  b_options.project_id = "test-only-invalid-project-id";
   auto b = GcsFileSystem::Make(b_options);
   EXPECT_THAT(b, NotNull());
   EXPECT_TRUE(b->Equals(*b));