You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/08/11 11:57:32 UTC

[beam] branch users/damccorm/revertGcsChanges created (now abe39b3af44)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a change to branch users/damccorm/revertGcsChanges
in repository https://gitbox.apache.org/repos/asf/beam.git


      at abe39b3af44 Merge in master

This branch includes the following new commits:

     new abe39b3af44 Merge in master

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Merge in master

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/revertGcsChanges
in repository https://gitbox.apache.org/repos/asf/beam.git

commit abe39b3af44292b09a0e75b233a43b040723d1c3
Author: Danny McCormick <da...@google.com>
AuthorDate: Fri Aug 11 07:57:04 2023 -0400

    Merge in master
---
 .../assets/symbols/python.g.yaml                   |    8 +
 .../examples/complete/game/user_score.py           |    1 -
 sdks/python/apache_beam/internal/gcp/auth.py       |    7 +-
 sdks/python/apache_beam/io/gcp/bigquery_test.py    |    4 +-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |    4 -
 .../apache_beam/io/gcp/bigquery_tools_test.py      |   18 +
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    |   33 +-
 .../apache_beam/io/gcp/gcsfilesystem_test.py       |   17 +-
 sdks/python/apache_beam/io/gcp/gcsio.py            |  631 +++--
 .../apache_beam/io/gcp/gcsio_integration_test.py   |  180 +-
 sdks/python/apache_beam/io/gcp/gcsio_overrides.py  |   55 +
 sdks/python/apache_beam/io/gcp/gcsio_test.py       |  886 +++++--
 .../io/gcp/internal/clients/storage/__init__.py    |   33 +
 .../internal/clients/storage/storage_v1_client.py  | 1517 +++++++++++
 .../clients/storage/storage_v1_messages.py         | 2714 ++++++++++++++++++++
 .../options/pipeline_options_validator_test.py     |    1 -
 .../runners/dataflow/internal/apiclient.py         |   65 +-
 .../apache_beam/runners/interactive/utils.py       |   26 +-
 .../apache_beam/runners/interactive/utils_test.py  |   41 +-
 .../runners/portability/sdk_container_builder.py   |   41 +-
 .../container/py310/base_image_requirements.txt    |    2 +-
 .../container/py311/base_image_requirements.txt    |    2 +-
 .../container/py38/base_image_requirements.txt     |    2 +-
 .../container/py39/base_image_requirements.txt     |    2 +-
 sdks/python/mypy.ini                               |    3 +
 sdks/python/setup.py                               |    1 -
 26 files changed, 5758 insertions(+), 536 deletions(-)

diff --git a/playground/frontend/playground_components/assets/symbols/python.g.yaml b/playground/frontend/playground_components/assets/symbols/python.g.yaml
index 0b9e5e142de..a47447225a6 100644
--- a/playground/frontend/playground_components/assets/symbols/python.g.yaml
+++ b/playground/frontend/playground_components/assets/symbols/python.g.yaml
@@ -4790,6 +4790,10 @@ GBKTransform:
   - from_runner_api_parameter
   - to_runner_api_parameter
 GcpTestIOError: {}
+GcsDownloader:
+  methods:
+  - get_range
+  - size
 GCSFileSystem:
   methods:
   - checksum
@@ -4833,6 +4837,10 @@ GcsIOError: {}
 GcsIOOverrides:
   methods:
   - retry_func
+GcsUploader:
+  methods:
+  - finish
+  - put
 GeneralPurposeConsumerSet:
   methods:
   - flush
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 03f0d00fc30..564cea8c425 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -177,7 +177,6 @@ def run(argv=None, save_main_session=True):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-
     (  # pylint: disable=expression-not-assigned
         p
         | 'ReadInputText' >> beam.io.ReadFromText(args.input)
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py
index 7e54ba0a4ba..47c3416babd 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -111,9 +111,6 @@ if _GOOGLE_AUTH_AVAILABLE:
       """Delegate attribute access to underlying google-auth credentials."""
       return getattr(self._google_auth_credentials, attr)
 
-    def get_google_auth_credentials(self):
-      return self._google_auth_credentials
-
 
 class _Credentials(object):
   _credentials_lock = threading.Lock()
@@ -122,7 +119,7 @@ class _Credentials(object):
 
   @classmethod
   def get_service_credentials(cls, pipeline_options):
-    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
+    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
     with cls._credentials_lock:
       if cls._credentials_init:
         return cls._credentials
@@ -142,7 +139,7 @@ class _Credentials(object):
 
   @staticmethod
   def _get_service_credentials(pipeline_options):
-    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
+    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
     if not _GOOGLE_AUTH_AVAILABLE:
       _LOGGER.warning(
           'Unable to find default credentials because the google-auth library '
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index fe1a568f414..3a3033dfcaf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -827,7 +827,6 @@ class TestWriteToBigQuery(unittest.TestCase):
           exception_type=exceptions.ServiceUnavailable if exceptions else None,
           error_message='backendError')
   ])
-  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')
   def test_load_job_exception(self, exception_type, error_message):
 
     with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
@@ -867,7 +866,6 @@ class TestWriteToBigQuery(unittest.TestCase):
           exception_type=exceptions.InternalServerError if exceptions else None,
           error_message='internalError'),
   ])
-  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')
   def test_copy_load_job_exception(self, exception_type, error_message):
 
     from apache_beam.io.gcp import bigquery_file_loads
@@ -886,7 +884,7 @@ class TestWriteToBigQuery(unittest.TestCase):
       mock.patch.object(BigQueryWrapper,
                         'wait_for_bq_job'), \
       mock.patch('apache_beam.io.gcp.internal.clients'
-        '.storage.storage_v1_client.StorageV1.ObjectsService'),\
+        '.storage.storage_v1_client.StorageV1.ObjectsService'), \
       mock.patch('time.sleep'), \
       self.assertRaises(Exception) as exc, \
       beam.Pipeline() as p:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index ad851212245..6f55879d2a3 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -339,9 +339,6 @@ class BigQueryWrapper(object):
   offer a common place where retry logic for failures can be controlled.
   In addition, it offers various functions used both in sources and sinks
   (e.g., find and create tables, query a table, etc.).
-
-  Note that client parameter in constructor is only for testing purposes and
-  should not be used in production code.
   """
 
   # If updating following names, also update the corresponding pydocs in
@@ -356,7 +353,6 @@ class BigQueryWrapper(object):
     self.gcp_bq_client = client or gcp_bigquery.Client(
         client_info=ClientInfo(
             user_agent="apache-beam-%s" % apache_beam.__version__))
-
     self._unique_row_id = 0
     # For testing scenarios where we pass in a client we do not want a
     # randomized prefix for row IDs.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 973f2a0f740..a3e39d8e18d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -60,6 +60,7 @@ try:
   from apitools.base.py.exceptions import HttpError, HttpForbiddenError
   from google.api_core.exceptions import ClientError, DeadlineExceeded
   from google.api_core.exceptions import InternalServerError
+  import google.cloud
 except ImportError:
   ClientError = None
   DeadlineExceeded = None
@@ -223,6 +224,23 @@ class TestBigQueryWrapper(unittest.TestCase):
     wrapper._delete_dataset('', '')
     self.assertTrue(client.datasets.Delete.called)
 
+  @unittest.skipIf(
+      google and not hasattr(google.cloud, '_http'),  # pylint: disable=c-extension-no-member
+      'Dependencies not installed')
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('google.cloud._http.JSONConnection.http')
+  def test_user_agent_insert_all(self, http_mock, patched_sleep):
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper()
+    try:
+      wrapper._insert_all_rows('p', 'd', 't', [{'name': 'any'}], None)
+    except:  # pylint: disable=bare-except
+      # Ignore errors. The errors come from the fact that we did not mock
+      # the response from the API, so the overall insert_all_rows call fails
+      # soon after the BQ API is called.
+      pass
+    call = http_mock.request.mock_calls[-2]
+    self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])
+
   @mock.patch('time.sleep', return_value=None)
   def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
     client = mock.Mock()
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index f40509493c1..c87a8499c91 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -254,13 +254,24 @@ class GCSFileSystem(FileSystem):
       gcs_batches.append(gcs_current_batch)
 
     # Execute GCS renames if any and return exceptions.
-    try:
-      for batch in gcs_batches:
-        self._gcsIO().copy_batch(batch)
-        self._gcsIO().delete_batch(source_file_names)
+    exceptions = {}
+    for batch in gcs_batches:
+      copy_statuses = self._gcsIO().copy_batch(batch)
+      copy_succeeded = []
+      for src, dest, exception in copy_statuses:
+        if exception:
+          exceptions[(src, dest)] = exception
+        else:
+          copy_succeeded.append((src, dest))
+      delete_batch = [src for src, dest in copy_succeeded]
+      delete_statuses = self._gcsIO().delete_batch(delete_batch)
+      for i, (src, exception) in enumerate(delete_statuses):
+        dest = copy_succeeded[i][1]
+        if exception:
+          exceptions[(src, dest)] = exception
 
-    except Exception as exception:
-      raise BeamIOError("Rename operation failed", exception)
+    if exceptions:
+      raise BeamIOError("Rename operation failed", exceptions)
 
   def exists(self, path):
     """Check if the provided path exists on the FileSystem.
@@ -329,7 +340,8 @@ class GCSFileSystem(FileSystem):
     """
     try:
       file_metadata = self._gcsIO()._status(path)
-      return FileMetadata(path, file_metadata['size'], file_metadata['updated'])
+      return FileMetadata(
+          path, file_metadata['size'], file_metadata['last_updated'])
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Metadata operation failed", {path: e})
 
@@ -348,7 +360,12 @@ class GCSFileSystem(FileSystem):
       else:
         path_to_use = path
       match_result = self.match([path_to_use])[0]
-      self._gcsIO().delete_batch([m.path for m in match_result.metadata_list])
+      statuses = self._gcsIO().delete_batch(
+          [m.path for m in match_result.metadata_list])
+      # pylint: disable=used-before-assignment
+      failures = [e for (_, e) in statuses if e is not None]
+      if failures:
+        raise failures[0]
 
     exceptions = {}
     for path in paths:
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index c3ca88a1643..800bd5d1c46 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -272,7 +272,8 @@ class GCSFileSystemTest(unittest.TestCase):
         'gs://bucket/to2',
         'gs://bucket/to3',
     ]
-    gcsio_mock.delete_batch.side_effect = Exception("BadThings")
+    exception = IOError('Failed')
+    gcsio_mock.delete_batch.side_effect = [[(f, exception) for f in sources]]
     gcsio_mock.copy_batch.side_effect = [[
         ('gs://bucket/from1', 'gs://bucket/to1', None),
         ('gs://bucket/from2', 'gs://bucket/to2', None),
@@ -280,8 +281,16 @@ class GCSFileSystemTest(unittest.TestCase):
     ]]
 
     # Issue batch rename.
-    with self.assertRaisesRegex(BeamIOError, r'^Rename operation failed'):
+    expected_results = {
+        (s, d): exception
+        for s, d in zip(sources, destinations)
+    }
+
+    # Issue batch rename.
+    with self.assertRaisesRegex(BeamIOError,
+                                r'^Rename operation failed') as error:
       self.fs.rename(sources, destinations)
+    self.assertEqual(error.exception.exception_details, expected_results)
 
     gcsio_mock.copy_batch.assert_called_once_with([
         ('gs://bucket/from1', 'gs://bucket/to1'),
@@ -299,7 +308,7 @@ class GCSFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
-    gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
+    gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
@@ -317,7 +326,7 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     exception = IOError('Failed')
     gcsio_mock.delete_batch.side_effect = exception
-    gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
+    gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 980e7dc3a4a..2fdbce73170 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -29,18 +29,28 @@ https://github.com/apache/beam/blob/master/sdks/python/OWNERS
 
 # pytype: skip-file
 
+import errno
+import io
 import logging
+import multiprocessing
 import re
+import threading
 import time
+import traceback
+from itertools import islice
 from typing import Optional
 from typing import Union
 
-from google.cloud import storage
-from google.cloud.exceptions import NotFound
-from google.cloud.storage.fileio import BlobReader
-from google.cloud.storage.fileio import BlobWriter
-
-from apache_beam.internal.gcp import auth
+import apache_beam
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.internal.metrics.metric import ServiceCallMetric
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import PipeStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.io.gcp import resource_identifiers
+from apache_beam.metrics import monitoring_infos
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.utils import retry
 from apache_beam.utils.annotations import deprecated
@@ -49,11 +59,58 @@ __all__ = ['GcsIO']
 
 _LOGGER = logging.getLogger(__name__)
 
+# Issue a friendlier error message if the storage library is not available.
+# TODO(silviuc): Remove this guard when storage is available everywhere.
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from apitools.base.py.batch import BatchApiRequest
+  from apitools.base.py.exceptions import HttpError
+  from apitools.base.py import transfer
+  from apache_beam.internal.gcp import auth
+  from apache_beam.io.gcp.internal.clients import storage
+except ImportError:
+  raise ImportError(
+      'Google Cloud Storage I/O not supported for this execution environment '
+      '(could not import storage API client).')
+
+# This is the size of each partial-file read operation from GCS.  This
+# parameter was chosen to give good throughput while keeping memory usage at
+# a reasonable level; the following table shows throughput reached when
+# reading files of a given size with a chosen buffer size and informed the
+# choice of the value, as of 11/2016:
+#
+# +---------------+------------+-------------+-------------+-------------+
+# |               | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
+# +---------------+------------+-------------+-------------+-------------+
+# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
+# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
+# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
+# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
+# +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
+# This is the number of seconds the library will wait for a partial-file read
+# operation from GCS to complete before retrying.
+DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
+
+# This is the size of chunks used when writing to GCS.
+WRITE_CHUNK_SIZE = 8 * 1024 * 1024
+
 # Maximum number of operations permitted in GcsIO.copy_batch() and
 # GcsIO.delete_batch().
-MAX_BATCH_OPERATION_SIZE = 1000
+MAX_BATCH_OPERATION_SIZE = 100
+
+# Batch endpoint URL for GCS.
+# We have to specify an API specific endpoint here since Google APIs global
+# batch endpoints will be deprecated on 03/25/2019.
+# See https://developers.googleblog.com/2018/03/discontinuing-support-for-json-rpc-and.html.  # pylint: disable=line-too-long
+# Currently apitools library uses a global batch endpoint by default:
+# https://github.com/google/apitools/blob/master/apitools/base/py/batch.py#L152
+# TODO: remove this constant and it's usage after apitools move to using an API
+# specific batch endpoint or after Beam gcsio module start using a GCS client
+# library that does not use global batch endpoints.
+GCS_BATCH_ENDPOINT = 'https://www.googleapis.com/batch/storage/v1'
 
 
 def parse_gcs_path(gcs_path, object_optional=False):
@@ -97,21 +154,29 @@ def get_or_create_default_gcs_bucket(options):
         bucket_name, project, location=region)
 
 
+class GcsIOError(IOError, retry.PermanentException):
+  """GCS IO error that should not be retried."""
+  pass
+
+
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
+    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
     if storage_client is None:
       if not pipeline_options:
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      credentials = auth.get_service_credentials(pipeline_options)
-      if credentials:
-        storage_client = storage.Client(
-            credentials=credentials.get_google_auth_credentials())
-      else:
-        storage_client = storage.Client.create_anonymous_client()
+      storage_client = storage.StorageV1(
+          credentials=auth.get_service_credentials(pipeline_options),
+          get_credentials=False,
+          http=get_new_http(),
+          response_encoding='utf8',
+          additional_http_headers={
+              "User-Agent": "apache-beam/%s (GPN:Beam)" %
+              apache_beam.__version__
+          })
     self.client = storage_client
     self._rewrite_cb = None
     self.bucket_to_project_number = {}
@@ -121,30 +186,40 @@ class GcsIO(object):
       bucket_metadata = self.get_bucket(bucket_name=bucket)
       if bucket_metadata:
         self.bucket_to_project_number[bucket] = bucket_metadata.projectNumber
+      #  else failed to load the bucket metadata due to HttpError
 
     return self.bucket_to_project_number.get(bucket, None)
 
+  def _set_rewrite_response_callback(self, callback):
+    """For testing purposes only. No backward compatibility guarantees.
+
+    Args:
+      callback: A function that receives ``storage.RewriteResponse``.
+    """
+    self._rewrite_cb = callback
+
   def get_bucket(self, bucket_name):
     """Returns an object bucket from its name, or None if it does not exist."""
     try:
-      return self.client.lookup_bucket(bucket_name)
-    except NotFound:
+      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+      return self.client.buckets.Get(request)
+    except HttpError:
       return None
 
   def create_bucket(self, bucket_name, project, kms_key=None, location=None):
     """Create and return a GCS bucket in a specific project."""
-
+    encryption = None
+    if kms_key:
+      encryption = storage.Bucket.EncryptionValue(kms_key)
+
+    request = storage.StorageBucketsInsertRequest(
+        bucket=storage.Bucket(
+            name=bucket_name, location=location, encryption=encryption),
+        project=project,
+    )
     try:
-      bucket = self.client.create_bucket(
-          bucket_or_name=bucket_name,
-          project=project,
-          location=location,
-      )
-      if kms_key:
-        bucket.default_kms_key_name(kms_key)
-        bucket.patch()
-      return bucket
-    except NotFound:
+      return self.client.buckets.Insert(request)
+    except HttpError:
       return None
 
   def open(
@@ -167,18 +242,24 @@ class GcsIO(object):
     Raises:
       ValueError: Invalid open file mode.
     """
-    bucket_name, blob_name = parse_gcs_path(filename)
-    bucket = self.client.get_bucket(bucket_name)
-
     if mode == 'r' or mode == 'rb':
-      blob = bucket.get_blob(blob_name)
-      return BeamBlobReader(blob, chunk_size=read_buffer_size)
+      downloader = GcsDownloader(
+          self.client,
+          filename,
+          buffer_size=read_buffer_size,
+          get_project_number=self.get_project_number)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
     elif mode == 'w' or mode == 'wb':
-      blob = bucket.get_blob(blob_name)
-      if not blob:
-        blob = storage.Blob(blob_name, bucket)
-      return BeamBlobWriter(blob, mime_type)
-
+      uploader = GcsUploader(
+          self.client,
+          filename,
+          mime_type,
+          get_project_number=self.get_project_number)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
     else:
       raise ValueError('Invalid file open mode: %s.' % mode)
 
@@ -190,13 +271,19 @@ class GcsIO(object):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket_name, blob_name = parse_gcs_path(path)
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsDeleteRequest(
+        bucket=bucket, object=object_path)
     try:
-      bucket = self.client.get_bucket(bucket_name)
-      bucket.delete_blob(blob_name)
-    except NotFound:
-      return
-
+      self.client.objects.Delete(request)
+    except HttpError as http_error:
+      if http_error.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      raise
+
+  # We intentionally do not decorate this method with a retry, as retrying is
+  # handled in BatchApiRequest.Execute().
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
@@ -204,96 +291,166 @@ class GcsIO(object):
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
 
-    Returns: List of tuples of (path, exception) in the same order as the
-             paths argument, where exception is None if the operation
-             succeeded or the relevant exception if the operation failed.
+    Returns: List of tuples of (path, exception) in the same order as the paths
+             argument, where exception is None if the operation succeeded or
+             the relevant exception if the operation failed.
     """
-    final_results = []
-    s = 0
-    while s < len(paths):
-      if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
-        current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
-      else:
-        current_paths = paths[s:]
-      current_batch = self.client.batch(raise_exception=False)
-      with current_batch:
-        for path in current_paths:
-          bucket_name, blob_name = parse_gcs_path(path)
-          bucket = self.client.get_bucket(bucket_name)
-          bucket.delete_blob(blob_name)
-
-      for path, resp in list(zip(current_paths, current_batch._responses)):
-        if resp.status_code == 404:
-          final_results.append((path, 200))
-        else:
-          final_results.append((path, resp.status_code))
-
-      s += MAX_BATCH_OPERATION_SIZE
-
-    return final_results
+    if not paths:
+      return []
+
+    paths = iter(paths)
+    result_statuses = []
+    while True:
+      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
+      if not paths_chunk:
+        return result_statuses
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
+          response_encoding='utf-8')
+      for path in paths_chunk:
+        bucket, object_path = parse_gcs_path(path)
+        request = storage.StorageObjectsDeleteRequest(
+            bucket=bucket, object=object_path)
+        batch_request.Add(self.client.objects, 'Delete', request)
+      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
+      for i, api_call in enumerate(api_calls):
+        path = paths_chunk[i]
+        exception = None
+        if api_call.is_error:
+          exception = api_call.exception
+          # Return success when the file doesn't exist anymore for idempotency.
+          if isinstance(exception, HttpError) and exception.status_code == 404:
+            exception = None
+        result_statuses.append((path, exception))
+    return result_statuses
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
+  def copy(
+      self,
+      src,
+      dest,
+      dest_kms_key_name=None,
+      max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite API call will return after these many bytes.
+        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket_name, src_blob_name = parse_gcs_path(src)
-    dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True)
-    src_bucket = self.get_bucket(src_bucket_name)
-    src_blob = src_bucket.get_blob(src_blob_name)
-    if not src_blob:
-      raise NotFound("Source %s not found", src)
-    dest_bucket = self.get_bucket(dest_bucket_name)
-    if not dest_blob_name:
-      dest_blob_name = None
-    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
-
-  def copy_batch(self, src_dest_pairs):
-    """Copies the given GCS objects from src to dest.
+    src_bucket, src_path = parse_gcs_path(src)
+    dest_bucket, dest_path = parse_gcs_path(dest)
+    request = storage.StorageObjectsRewriteRequest(
+        sourceBucket=src_bucket,
+        sourceObject=src_path,
+        destinationBucket=dest_bucket,
+        destinationObject=dest_path,
+        destinationKmsKeyName=dest_kms_key_name,
+        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+    response = self.client.objects.Rewrite(request)
+    while not response.done:
+      _LOGGER.debug(
+          'Rewrite progress: %d of %d bytes, %s to %s',
+          response.totalBytesRewritten,
+          response.objectSize,
+          src,
+          dest)
+      request.rewriteToken = response.rewriteToken
+      response = self.client.objects.Rewrite(request)
+      if self._rewrite_cb is not None:
+        self._rewrite_cb(response)
+
+    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
+
+  # We intentionally do not decorate this method with a retry, as retrying is
+  # handled in BatchApiRequest.Execute().
+  def copy_batch(
+      self,
+      src_dest_pairs,
+      dest_kms_key_name=None,
+      max_bytes_rewritten_per_call=None):
+    """Copies the given GCS object from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite call will return after these many bytes. Used
+        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
-    final_results = []
-    s = 0
-    while s < len(src_dest_pairs):
-      if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
-        current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
-      else:
-        current_pairs = src_dest_pairs[s:]
-      current_batch = self.client.batch(raise_exception=False)
-      with current_batch:
-        for pair in current_pairs:
-          src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
-          dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
-          src_bucket = self.client.get_bucket(src_bucket_name)
-          src_blob = src_bucket.get_blob(src_blob_name)
-          dest_bucket = self.client.get_bucket(dest_bucket_name)
-
-          src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
-
-      for pair, resp in list(zip(current_pairs, current_batch._responses)):
-        final_results.append((pair[0], pair[1], resp.status_code))
-
-      s += MAX_BATCH_OPERATION_SIZE
+    if not src_dest_pairs:
+      return []
+    pair_to_request = {}
+    for pair in src_dest_pairs:
+      src_bucket, src_path = parse_gcs_path(pair[0])
+      dest_bucket, dest_path = parse_gcs_path(pair[1])
+      request = storage.StorageObjectsRewriteRequest(
+          sourceBucket=src_bucket,
+          sourceObject=src_path,
+          destinationBucket=dest_bucket,
+          destinationObject=dest_path,
+          destinationKmsKeyName=dest_kms_key_name,
+          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+      pair_to_request[pair] = request
+    pair_to_status = {}
+    while True:
+      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
+      if not pairs_in_batch:
+        break
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
+          response_encoding='utf-8')
+      for pair in pairs_in_batch:
+        batch_request.Add(self.client.objects, 'Rewrite', pair_to_request[pair])
+      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
+      for pair, api_call in zip(pairs_in_batch, api_calls):
+        src, dest = pair
+        response = api_call.response
+        if self._rewrite_cb is not None:
+          self._rewrite_cb(response)
+        if api_call.is_error:
+          exception = api_call.exception
+          # Translate 404 to the appropriate not found exception.
+          if isinstance(exception, HttpError) and exception.status_code == 404:
+            exception = (
+                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
+          pair_to_status[pair] = exception
+        elif not response.done:
+          _LOGGER.debug(
+              'Rewrite progress: %d of %d bytes, %s to %s',
+              response.totalBytesRewritten,
+              response.objectSize,
+              src,
+              dest)
+          pair_to_request[pair].rewriteToken = response.rewriteToken
+        else:
+          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
+          pair_to_status[pair] = None
 
-    return final_results
+    return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
 
   # We intentionally do not decorate this method with a retry, since the
-  # underlying copy and delete operations are already idempotent operations.
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
   def copytree(self, src, dest):
     """Renames the given GCS "directory" recursively from src to dest.
 
@@ -308,7 +465,8 @@ class GcsIO(object):
       self.copy(entry, dest + rel_path)
 
   # We intentionally do not decorate this method with a retry, since the
-  # underlying copy and delete operations are already idempotent operations.
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
   def rename(self, src, dest):
     """Renames the given GCS object from src to dest.
 
@@ -326,10 +484,15 @@ class GcsIO(object):
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
     try:
-      self._gcs_object(path)
+      self._gcs_object(path)  # gcs object
       return True
-    except NotFound:
-      return False
+    except HttpError as http_error:
+      if http_error.status_code == 404:
+        # HTTP 404 indicates that the file did not exist
+        return False
+      else:
+        # We re-raise all other exceptions
+        raise
 
   def checksum(self, path):
     """Looks up the checksum of a GCS object.
@@ -358,7 +521,7 @@ class GcsIO(object):
     Returns: KMS key name of the GCS object as a string, or None if it doesn't
       have one.
     """
-    return self._gcs_object(path).kms_key_name
+    return self._gcs_object(path).kmsKeyName
 
   def last_updated(self, path):
     """Returns the last updated epoch time of a single GCS object.
@@ -385,10 +548,10 @@ class GcsIO(object):
     file_status = {}
     if hasattr(gcs_object, 'crc32c'):
       file_status['checksum'] = gcs_object.crc32c
-    if hasattr(gcs_object, 'kms_key_name'):
-      file_status['kms_key'] = gcs_object.kms_key_name
+    if hasattr(gcs_object, 'kmsKeyName'):
+      file_status['kms_key'] = gcs_object.kmsKeyName
     if hasattr(gcs_object, 'updated'):
-      file_status['updated'] = self._updated_to_seconds(gcs_object.updated)
+      file_status['last_updated'] = self._updated_to_seconds(gcs_object.updated)
     if hasattr(gcs_object, 'size'):
       file_status['size'] = gcs_object.size
     return file_status
@@ -403,13 +566,10 @@ class GcsIO(object):
 
     Returns: GCS object.
     """
-    bucket_name, blob_name = parse_gcs_path(path)
-    bucket = self.client.get_bucket(bucket_name)
-    blob = bucket.get_blob(blob_name)
-    if blob:
-      return blob
-    else:
-      raise NotFound('Object %s not found', path)
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsGetRequest(
+        bucket=bucket, object=object_path)
+    return self.client.objects.Get(request)
 
   @deprecated(since='2.45.0', current='list_files')
   def list_prefix(self, path, with_metadata=False):
@@ -444,7 +604,8 @@ class GcsIO(object):
       ``with_metadata`` is True: generator of
       tuple(file name, tuple(size, timestamp)).
     """
-    bucket_name, prefix = parse_gcs_path(path, object_optional=True)
+    bucket, prefix = parse_gcs_path(path, object_optional=True)
+    request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
     file_info = set()
     counter = 0
     start_time = time.time()
@@ -452,26 +613,35 @@ class GcsIO(object):
       _LOGGER.debug("Starting the file information of the input")
     else:
       _LOGGER.debug("Starting the size estimation of the input")
-    bucket = self.client.get_bucket(bucket_name)
-    response = self.client.list_blobs(bucket, prefix=prefix)
-    for item in response:
-      file_name = 'gs://%s/%s' % (item.bucket.name, item.name)
-      if file_name not in file_info:
-        file_info.add(file_name)
-        counter += 1
-        if counter % 10000 == 0:
+    while True:
+      response = retry.with_exponential_backoff(
+          retry_filter=retry.retry_on_server_errors_and_timeout_filter)(
+              self.client.objects.List)(
+                  request)
+
+      for item in response.items:
+        file_name = 'gs://%s/%s' % (item.bucket, item.name)
+        if file_name not in file_info:
+          file_info.add(file_name)
+          counter += 1
+          if counter % 10000 == 0:
+            if with_metadata:
+              _LOGGER.info(
+                  "Finished computing file information of: %s files",
+                  len(file_info))
+            else:
+              _LOGGER.info(
+                  "Finished computing size of: %s files", len(file_info))
+
           if with_metadata:
-            _LOGGER.info(
-                "Finished computing file information of: %s files",
-                len(file_info))
+            yield file_name, (item.size, self._updated_to_seconds(item.updated))
           else:
-            _LOGGER.info("Finished computing size of: %s files", len(file_info))
-
-        if with_metadata:
-          yield file_name, (item.size, self._updated_to_seconds(item.updated))
-        else:
-          yield file_name, item.size
+            yield file_name, item.size
 
+      if response.nextPageToken:
+        request.pageToken = response.nextPageToken
+      else:
+        break
     _LOGGER.log(
         # do not spam logs when list_prefix is likely used to check empty folder
         logging.INFO if counter > 0 else logging.DEBUG,
@@ -487,18 +657,173 @@ class GcsIO(object):
         updated.microsecond / 1000000.0)
 
 
-class BeamBlobReader(BlobReader):
-  def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE):
-    super().__init__(blob, chunk_size=chunk_size)
-    self.mode = "r"
+class GcsDownloader(Downloader):
+  def __init__(self, client, path, buffer_size, get_project_number):
+    self._client = client
+    self._path = path
+    self._bucket, self._name = parse_gcs_path(path)
+    self._buffer_size = buffer_size
+    self._get_project_number = get_project_number
+
+    # Create a request count metric
+    resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: self._bucket
+    }
+    project_number = self._get_project_number(self._bucket)
+    if project_number:
+      labels[monitoring_infos.GCS_PROJECT_ID_LABEL] = str(project_number)
+    else:
+      _LOGGER.debug(
+          'Possibly missing storage.buckets.get permission to '
+          'bucket %s. Label %s is not added to the counter because it '
+          'cannot be identified.',
+          self._bucket,
+          monitoring_infos.GCS_PROJECT_ID_LABEL)
+
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+
+    # Get object state.
+    self._get_request = (
+        storage.StorageObjectsGetRequest(
+            bucket=self._bucket, object=self._name))
+    try:
+      metadata = self._get_object_metadata(self._get_request)
+    except HttpError as http_error:
+      service_call_metric.call(http_error)
+      if http_error.status_code == 404:
+        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
+      else:
+        _LOGGER.error(
+            'HTTP error while requesting file %s: %s', self._path, http_error)
+        raise
+    else:
+      service_call_metric.call('ok')
 
+    self._size = metadata.size
 
-class BeamBlobWriter(BlobWriter):
-  def __init__(
-      self, blob, content_type, chunk_size=16 * 1024 * 1024, ignore_flush=True):
-    super().__init__(
-        blob,
-        content_type=content_type,
-        chunk_size=chunk_size,
-        ignore_flush=ignore_flush)
-    self.mode = "w"
+    # Ensure read is from file of the correct generation.
+    self._get_request.generation = metadata.generation
+
+    # Initialize read buffer state.
+    self._download_stream = io.BytesIO()
+    self._downloader = transfer.Download(
+        self._download_stream,
+        auto_transfer=False,
+        chunksize=self._buffer_size,
+        num_retries=20)
+
+    try:
+      self._client.objects.Get(self._get_request, download=self._downloader)
+      service_call_metric.call('ok')
+    except HttpError as e:
+      service_call_metric.call(e)
+      raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _get_object_metadata(self, get_request):
+    return self._client.objects.Get(get_request)
+
+  @property
+  def size(self):
+    return self._size
+
+  def get_range(self, start, end):
+    self._download_stream.seek(0)
+    self._download_stream.truncate(0)
+    self._downloader.GetRange(start, end - 1)
+    return self._download_stream.getvalue()
+
+
+class GcsUploader(Uploader):
+  def __init__(self, client, path, mime_type, get_project_number):
+    self._client = client
+    self._path = path
+    self._bucket, self._name = parse_gcs_path(path)
+    self._mime_type = mime_type
+    self._get_project_number = get_project_number
+
+    # Set up communication with child thread.
+    parent_conn, child_conn = multiprocessing.Pipe()
+    self._child_conn = child_conn
+    self._conn = parent_conn
+
+    # Set up uploader.
+    self._insert_request = (
+        storage.StorageObjectsInsertRequest(
+            bucket=self._bucket, name=self._name))
+    self._upload = transfer.Upload(
+        PipeStream(self._child_conn),
+        self._mime_type,
+        chunksize=WRITE_CHUNK_SIZE)
+    self._upload.strategy = transfer.RESUMABLE_UPLOAD
+
+    # Start uploading thread.
+    self._upload_thread = threading.Thread(target=self._start_upload)
+    self._upload_thread.daemon = True
+    self._upload_thread.last_error = None
+    self._upload_thread.start()
+
+  # TODO(silviuc): Refactor so that retry logic can be applied.
+  # There is retry logic in the underlying transfer library but we should make
+  # it more explicit so we can control the retry parameters.
+  @retry.no_retries  # Using no_retries marks this as an integration point.
+  def _start_upload(self):
+    # This starts the uploader thread.  We are forced to run the uploader in
+    # another thread because the apitools uploader insists on taking a stream
+    # as input. Happily, this also means we get asynchronous I/O to GCS.
+    #
+    # The uploader by default transfers data in chunks of 1024 * 1024 bytes at
+    # a time, buffering writes until that size is reached.
+
+    project_number = self._get_project_number(self._bucket)
+
+    # Create a request count metric
+    resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.insert',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: self._bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(project_number)
+    }
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+    try:
+      self._client.objects.Insert(self._insert_request, upload=self._upload)
+      service_call_metric.call('ok')
+    except Exception as e:  # pylint: disable=broad-except
+      service_call_metric.call(e)
+      _LOGGER.error(
+          'Error in _start_upload while inserting file %s: %s',
+          self._path,
+          traceback.format_exc())
+      self._upload_thread.last_error = e
+    finally:
+      self._child_conn.close()
+
+  def put(self, data):
+    try:
+      self._conn.send_bytes(data.tobytes())
+    except EOFError:
+      if self._upload_thread.last_error is not None:
+        raise self._upload_thread.last_error  # pylint: disable=raising-bad-type
+      raise
+
+  def finish(self):
+    self._conn.close()
+    # TODO(udim): Add timeout=DEFAULT_HTTP_TIMEOUT_SECONDS * 2 and raise if
+    # isAlive is True.
+    self._upload_thread.join()
+    # Check for exception since the last put() call.
+    if self._upload_thread.last_error is not None:
+      e = self._upload_thread.last_error
+      raise type(self._upload_thread.last_error)(
+          "Error while uploading file %s" % self._path) from e  # pylint: disable=raising-bad-type
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
index cfd5321d1cc..4ffbea0ba02 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
@@ -20,6 +20,21 @@
 Runs tests against Google Cloud Storage service.
 Instantiates a TestPipeline to get options such as GCP project name, but
 doesn't actually start a Beam pipeline or test any specific runner.
+
+Options:
+  --kms_key_name=projects/<project-name>/locations/<region>/keyRings/\
+      <key-ring-name>/cryptoKeys/<key-name>/cryptoKeyVersions/<version>
+    Pass a Cloud KMS key name to test GCS operations using customer managed
+    encryption keys (CMEK).
+
+Cloud KMS permissions:
+The project's Cloud Storage service account requires Encrypter/Decrypter
+permissions for the key specified in --kms_key_name.
+
+To run these tests manually:
+  ./gradlew :sdks:python:test-suites:dataflow:integrationTest \
+    -Dtests=apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest \
+    -DkmsKeyName=KMS_KEY_NAME
 """
 
 # pytype: skip-file
@@ -43,6 +58,9 @@ except ImportError:
 class GcsIOIntegrationTest(unittest.TestCase):
 
   INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt'
+  # Larger than 1MB to test maxBytesRewrittenPerCall.
+  # Also needs to be in a different region than the dest to take effect.
+  INPUT_FILE_LARGE = 'gs://apache-beam-samples-us-east1/wikipedia_edits/wiki_data-000000000000.json'  # pylint: disable=line-too-long
 
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
@@ -56,86 +74,126 @@ class GcsIOIntegrationTest(unittest.TestCase):
     self.gcs_tempdir = (
         self.test_pipeline.get_option('temp_location') + '/gcs_it-' +
         str(uuid.uuid4()))
+    self.kms_key_name = self.test_pipeline.get_option('kms_key_name')
     self.gcsio = gcsio.GcsIO()
 
   def tearDown(self):
     FileSystems.delete([self.gcs_tempdir + '/'])
 
-  def _verify_copy(self, src, dest, dest_kms_key_name=None):
-    self.assertTrue(
-        FileSystems.exists(src), 'source file does not exist: %s' % src)
-    self.assertTrue(
-        FileSystems.exists(dest),
-        'copied file not present in destination: %s' % dest)
+  def _verify_copy(self, src, dst, dst_kms_key_name=None):
+    self.assertTrue(FileSystems.exists(src), 'src does not exist: %s' % src)
+    self.assertTrue(FileSystems.exists(dst), 'dst does not exist: %s' % dst)
     src_checksum = self.gcsio.checksum(src)
-    dest_checksum = self.gcsio.checksum(dest)
-    self.assertEqual(src_checksum, dest_checksum)
-    actual_dest_kms_key = self.gcsio.kms_key(dest)
-    if actual_dest_kms_key is None:
-      self.assertEqual(actual_dest_kms_key, dest_kms_key_name)
+    dst_checksum = self.gcsio.checksum(dst)
+    self.assertEqual(src_checksum, dst_checksum)
+    actual_dst_kms_key = self.gcsio.kms_key(dst)
+    if actual_dst_kms_key is None:
+      self.assertEqual(actual_dst_kms_key, dst_kms_key_name)
     else:
       self.assertTrue(
-          actual_dest_kms_key.startswith(dest_kms_key_name),
+          actual_dst_kms_key.startswith(dst_kms_key_name),
           "got: %s, wanted startswith: %s" %
-          (actual_dest_kms_key, dest_kms_key_name))
+          (actual_dst_kms_key, dst_kms_key_name))
+
+  def _test_copy(
+      self,
+      name,
+      kms_key_name=None,
+      max_bytes_rewritten_per_call=None,
+      src=None):
+    src = src or self.INPUT_FILE
+    dst = self.gcs_tempdir + '/%s' % name
+    extra_kwargs = {}
+    if max_bytes_rewritten_per_call is not None:
+      extra_kwargs['max_bytes_rewritten_per_call'] = (
+          max_bytes_rewritten_per_call)
+
+    self.gcsio.copy(src, dst, kms_key_name, **extra_kwargs)
+    self._verify_copy(src, dst, kms_key_name)
 
   @pytest.mark.it_postcommit
   def test_copy(self):
-    src = self.INPUT_FILE
-    dest = self.gcs_tempdir + '/test_copy'
+    self._test_copy("test_copy")
 
-    self.gcsio.copy(src, dest)
-    self._verify_copy(src, dest)
+  @pytest.mark.it_postcommit
+  def test_copy_kms(self):
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+    self._test_copy("test_copy_kms", self.kms_key_name)
 
   @pytest.mark.it_postcommit
-  def test_batch_copy_and_delete(self):
+  def test_copy_rewrite_token(self):
+    # Tests a multi-part copy (rewrite) operation. This is triggered by a
+    # combination of 3 conditions:
+    #  - a large enough src
+    #  - setting max_bytes_rewritten_per_call
+    #  - setting kms_key_name
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+
+    rewrite_responses = []
+    self.gcsio._set_rewrite_response_callback(
+        lambda response: rewrite_responses.append(response))
+    self._test_copy(
+        "test_copy_rewrite_token",
+        kms_key_name=self.kms_key_name,
+        max_bytes_rewritten_per_call=50 * 1024 * 1024,
+        src=self.INPUT_FILE_LARGE)
+    # Verify that there was a multi-part rewrite.
+    self.assertTrue(any(not r.done for r in rewrite_responses))
+
+  def _test_copy_batch(
+      self,
+      name,
+      kms_key_name=None,
+      max_bytes_rewritten_per_call=None,
+      src=None):
     num_copies = 10
-    srcs = [self.INPUT_FILE] * num_copies
-    dests = [
-        self.gcs_tempdir + '/test_copy_batch_%d' % i for i in range(num_copies)
-    ]
-    src_dest_pairs = list(zip(srcs, dests))
-
-    copy_results = self.gcsio.copy_batch(src_dest_pairs)
-
-    self.assertEqual(len(copy_results), len(src_dest_pairs))
-
-    for pair, result in list(zip(src_dest_pairs, copy_results)):
-      self._verify_copy(pair[0], pair[1])
-      self.assertEqual(
-          pair[0],
-          result[0],
-          'copy source %s does not match %s' % (pair[0], str(result)))
-      self.assertEqual(
-          pair[1],
-          result[1],
-          'copy destination %s does not match %s' % (pair[1], result[1]))
-      self.assertTrue(
-          (result[2] < 300),
-          'response code %s indicates that copy operation did not succeed' %
-          result[2])
-
-    delete_results = self.gcsio.delete_batch(dests)
+    srcs = [src or self.INPUT_FILE] * num_copies
+    dsts = [self.gcs_tempdir + '/%s_%d' % (name, i) for i in range(num_copies)]
+    src_dst_pairs = list(zip(srcs, dsts))
+    extra_kwargs = {}
+    if max_bytes_rewritten_per_call is not None:
+      extra_kwargs['max_bytes_rewritten_per_call'] = (
+          max_bytes_rewritten_per_call)
+
+    result_statuses = self.gcsio.copy_batch(
+        src_dst_pairs, kms_key_name, **extra_kwargs)
+    for status in result_statuses:
+      self.assertIsNone(status[2], status)
+    for _src, _dst in src_dst_pairs:
+      self._verify_copy(_src, _dst, kms_key_name)
 
-    self.assertEqual(len(delete_results), len(dests))
-
-    for dest, result in list(zip(dests, delete_results)):
-      self.assertFalse(
-          FileSystems.exists(dest), 'deleted file still exists: %s' % dest)
-      self.assertEqual(
-          dest,
-          result[0],
-          'delete path %s does not match %s' % (dest, result[0]))
-      self.assertTrue(
-          (result[1] < 300),
-          'response code %s indicates that delete operation did not succeed' %
-          result[1])
+  @pytest.mark.it_postcommit
+  def test_copy_batch(self):
+    self._test_copy_batch("test_copy_batch")
 
-    redelete_results = self.gcsio.delete_batch(dests)
+  @pytest.mark.it_postcommit
+  def test_copy_batch_kms(self):
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+    self._test_copy_batch("test_copy_batch_kms", self.kms_key_name)
 
-    for dest, result in list(zip(dests, redelete_results)):
-      self.assertTrue((result[1] < 300),
-                      're-delete should not throw error: %s' % result[1])
+  @pytest.mark.it_postcommit
+  def test_copy_batch_rewrite_token(self):
+    # Tests a multi-part copy (rewrite) operation. This is triggered by a
+    # combination of 3 conditions:
+    #  - a large enough src
+    #  - setting max_bytes_rewritten_per_call
+    #  - setting kms_key_name
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+
+    rewrite_responses = []
+    self.gcsio._set_rewrite_response_callback(
+        lambda response: rewrite_responses.append(response))
+    self._test_copy_batch(
+        "test_copy_batch_rewrite_token",
+        kms_key_name=self.kms_key_name,
+        max_bytes_rewritten_per_call=50 * 1024 * 1024,
+        src=self.INPUT_FILE_LARGE)
+    # Verify that there was a multi-part rewrite.
+    self.assertTrue(any(not r.done for r in rewrite_responses))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_overrides.py b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
new file mode 100644
index 00000000000..fc06cb28f1a
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+import math
+import time
+
+from apache_beam.metrics.metric import Metrics
+from apitools.base.py import exceptions
+from apitools.base.py import http_wrapper
+from apitools.base.py import util
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class GcsIOOverrides(object):
+  """Functions for overriding Google Cloud Storage I/O client."""
+
+  _THROTTLED_SECS = Metrics.counter('StorageV1', "cumulativeThrottlingSeconds")
+
+  @classmethod
+  def retry_func(cls, retry_args):
+    # handling GCS download throttling errors (BEAM-7424)
+    if (isinstance(retry_args.exc, exceptions.BadStatusCodeError) and
+        retry_args.exc.status_code == http_wrapper.TOO_MANY_REQUESTS):
+      _LOGGER.debug(
+          'Caught GCS quota error (%s), retrying.', retry_args.exc.status_code)
+    else:
+      return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
+
+    http_wrapper.RebuildHttpConnections(retry_args.http)
+    _LOGGER.debug(
+        'Retrying request to url %s after exception %s',
+        retry_args.http_request.url,
+        retry_args.exc)
+    sleep_seconds = util.CalculateWaitForRetry(
+        retry_args.num_retries, max_wait=retry_args.max_retry_wait)
+    cls._THROTTLED_SECS.inc(math.ceil(sleep_seconds))
+    time.sleep(sleep_seconds)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index f8b580c91c9..9cc5a9e1df0 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -18,147 +18,257 @@
 """Tests for Google Cloud Storage client."""
 # pytype: skip-file
 
+import datetime
+import errno
+import io
 import logging
 import os
+import random
+import time
 import unittest
-from datetime import datetime
+from email.message import Message
 
+import httplib2
 import mock
 
+# Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
+import apache_beam
+from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.metricbase import MetricName
 
 try:
-  from apache_beam.io.gcp import gcsio
-  from google.cloud.exceptions import BadRequest, NotFound
+  from apache_beam.io.gcp import gcsio, resource_identifiers
+  from apache_beam.io.gcp.internal.clients import storage
+  from apitools.base.py.exceptions import HttpError
 except ImportError:
-  NotFound = None
+  HttpError = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
 DEFAULT_GCP_PROJECT = 'apache-beam-testing'
+DEFAULT_PROJECT_NUMBER = 1
 
 
 class FakeGcsClient(object):
-  # Fake storage client.
+  # Fake storage client.  Usage in gcsio.py is client.objects.Get(...) and
+  # client.objects.Insert(...).
 
   def __init__(self):
-    self.buckets = {}
+    self.objects = FakeGcsObjects()
+    self.buckets = FakeGcsBuckets()
+    # Referenced in GcsIO.copy_batch() and GcsIO.delete_batch().
+    self._http = object()
 
-  def create_bucket(self, name):
-    self.buckets[name] = FakeBucket(self, name)
-    return self.buckets[name]
 
-  def get_bucket(self, name):
-    if name in self.buckets:
-      return self.buckets[name]
-    else:
-      raise NotFound("Bucket not found")
+class FakeFile(object):
+  def __init__(
+      self, bucket, obj, contents, generation, crc32c=None, last_updated=None):
+    self.bucket = bucket
+    self.object = obj
+    self.contents = contents
+    self.generation = generation
+    self.crc32c = crc32c
+    self.last_updated = last_updated
+
+  def get_metadata(self):
+    last_updated_datetime = None
+    if self.last_updated:
+      last_updated_datetime = datetime.datetime.utcfromtimestamp(
+          self.last_updated)
+
+    return storage.Object(
+        bucket=self.bucket,
+        name=self.object,
+        generation=self.generation,
+        size=len(self.contents),
+        crc32c=self.crc32c,
+        updated=last_updated_datetime)
 
-  def lookup_bucket(self, name):
-    if name in self.buckets:
-      return self.buckets[name]
-    else:
-      return self.create_bucket(name)
 
-  def batch(self):
+class FakeGcsBuckets(object):
+  def __init__(self):
     pass
 
-  def add_file(self, bucket, blob, contents):
-    folder = self.lookup_bucket(bucket)
-    holder = folder.lookup_blob(blob)
-    holder.contents = contents
+  def get_bucket(self, bucket):
+    return storage.Bucket(name=bucket, projectNumber=DEFAULT_PROJECT_NUMBER)
+
+  def Get(self, get_request):
+    return self.get_bucket(get_request.bucket)
 
-  def get_file(self, bucket, blob):
-    folder = self.get_bucket(bucket.name)
-    holder = folder.get_blob(blob.name)
-    return holder
 
-  def list_blobs(self, bucket_or_path, prefix=None):
-    bucket = self.get_bucket(bucket_or_path.name)
-    if not prefix:
-      return list(bucket.blobs.values())
+class FakeGcsObjects(object):
+  def __init__(self):
+    self.files = {}
+    # Store the last generation used for a given object name.  Note that this
+    # has to persist even past the deletion of the object.
+    self.last_generation = {}
+    self.list_page_tokens = {}
+    self._fail_when_getting_metadata = []
+    self._fail_when_reading = []
+
+  def add_file(
+      self, f, fail_when_getting_metadata=False, fail_when_reading=False):
+    self.files[(f.bucket, f.object)] = f
+    self.last_generation[(f.bucket, f.object)] = f.generation
+    if fail_when_getting_metadata:
+      self._fail_when_getting_metadata.append(f)
+    if fail_when_reading:
+      self._fail_when_reading.append(f)
+
+  def get_file(self, bucket, obj):
+    return self.files.get((bucket, obj), None)
+
+  def delete_file(self, bucket, obj):
+    del self.files[(bucket, obj)]
+
+  def get_last_generation(self, bucket, obj):
+    return self.last_generation.get((bucket, obj), 0)
+
+  def Get(self, get_request, download=None):  # pylint: disable=invalid-name
+    f = self.get_file(get_request.bucket, get_request.object)
+    if f is None:
+      # Failing with an HTTP 404 if file does not exist.
+      raise HttpError({'status': 404}, None, None)
+    if download is None:
+      if f in self._fail_when_getting_metadata:
+        raise HttpError({'status': 429}, None, None)
+      return f.get_metadata()
     else:
-      output = []
-      for name in list(bucket.blobs):
-        if name[0:len(prefix)] == prefix:
-          output.append(bucket.blobs[name])
-      return output
-
-
-class FakeBucket(object):
-  #Fake bucket for storing test blobs locally.
-
-  def __init__(self, client, name):
-    self.client = client
-    self.name = name
-    self.blobs = {}
-    self.default_kms_key_name = None
-    self.client.buckets[name] = self
-
-  def add_blob(self, blob):
-    self.blobs[blob.name] = blob
-
-  def create_blob(self, name):
-    return FakeBlob(name, self)
-
-  def copy_blob(self, blob, dest, new_name=None):
-    if not new_name:
-      new_name = blob.name
-    dest.blobs[new_name] = blob
-    dest.blobs[new_name].name = new_name
-    dest.blobs[new_name].bucket = dest
-    return dest.blobs[new_name]
-
-  def get_blob(self, blob_name):
-    if blob_name in self.blobs:
-      return self.blobs[blob_name]
+      if f in self._fail_when_reading:
+        raise HttpError({'status': 429}, None, None)
+      stream = download.stream
+
+      def get_range_callback(start, end):
+        if not 0 <= start <= end < len(f.contents):
+          raise ValueError(
+              'start=%d end=%d len=%s' % (start, end, len(f.contents)))
+        stream.write(f.contents[start:end + 1])
+
+      download.GetRange = get_range_callback
+
+  def Insert(self, insert_request, upload=None):  # pylint: disable=invalid-name
+    assert upload is not None
+    generation = self.get_last_generation(
+        insert_request.bucket, insert_request.name) + 1
+    f = FakeFile(insert_request.bucket, insert_request.name, b'', generation)
+
+    # Stream data into file.
+    stream = upload.stream
+    data_list = []
+    while True:
+      data = stream.read(1024 * 1024)
+      if not data:
+        break
+      data_list.append(data)
+    f.contents = b''.join(data_list)
+
+    self.add_file(f)
+
+  REWRITE_TOKEN = 'test_token'
+
+  def Rewrite(self, rewrite_request):  # pylint: disable=invalid-name
+    if rewrite_request.rewriteToken == self.REWRITE_TOKEN:
+      dest_object = storage.Object()
+      return storage.RewriteResponse(
+          done=True,
+          objectSize=100,
+          resource=dest_object,
+          totalBytesRewritten=100)
+
+    src_file = self.get_file(
+        rewrite_request.sourceBucket, rewrite_request.sourceObject)
+    if not src_file:
+      raise HttpError(
+          httplib2.Response({'status': '404'}),
+          '404 Not Found',
+          'https://fake/url')
+    generation = self.get_last_generation(
+        rewrite_request.destinationBucket,
+        rewrite_request.destinationObject) + 1
+    dest_file = FakeFile(
+        rewrite_request.destinationBucket,
+        rewrite_request.destinationObject,
+        src_file.contents,
+        generation)
+    self.add_file(dest_file)
+    time.sleep(10)  # time.sleep and time.time are mocked below.
+    return storage.RewriteResponse(
+        done=False,
+        objectSize=100,
+        rewriteToken=self.REWRITE_TOKEN,
+        totalBytesRewritten=5)
+
+  def Delete(self, delete_request):  # pylint: disable=invalid-name
+    # Here, we emulate the behavior of the GCS service in raising a 404 error
+    # if this object already exists.
+    if self.get_file(delete_request.bucket, delete_request.object):
+      self.delete_file(delete_request.bucket, delete_request.object)
     else:
-      return None
-
-  def lookup_blob(self, name):
-    if name in self.blobs:
-      return self.blobs[name]
+      raise HttpError(
+          httplib2.Response({'status': '404'}),
+          '404 Not Found',
+          'https://fake/url')
+
+  def List(self, list_request):  # pylint: disable=invalid-name
+    bucket = list_request.bucket
+    prefix = list_request.prefix or ''
+    matching_files = []
+    for file_bucket, file_name in sorted(iter(self.files)):
+      if bucket == file_bucket and file_name.startswith(prefix):
+        file_object = self.files[(file_bucket, file_name)].get_metadata()
+        matching_files.append(file_object)
+
+    # Handle pagination.
+    items_per_page = 5
+    if not list_request.pageToken:
+      range_start = 0
     else:
-      return self.create_blob(name)
-
-  def set_default_kms_key_name(self, name):
-    self.default_kms_key_name = name
-
-  def delete_blob(self, name):
-    if name in self.blobs:
-      del self.blobs[name]
-
-
-class FakeBlob(object):
-  def __init__(
-      self,
-      name,
-      bucket,
-      size=0,
-      contents=None,
-      generation=1,
-      crc32c=None,
-      kms_key_name=None,
-      updated=None,
-      fail_when_getting_metadata=False,
-      fail_when_reading=False):
-    self.name = name
-    self.bucket = bucket
-    self.size = size
-    self.contents = contents
-    self._generation = generation
-    self.crc32c = crc32c
-    self.kms_key_name = kms_key_name
-    self.updated = updated
-    self._fail_when_getting_metadata = fail_when_getting_metadata
-    self._fail_when_reading = fail_when_reading
-    self.bucket.add_blob(self)
-
-  def delete(self):
-    if self.name in self.bucket.blobs:
-      del self.bucket.blobs[self.name]
-
-
-@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
+      if list_request.pageToken not in self.list_page_tokens:
+        raise ValueError('Invalid page token.')
+      range_start = self.list_page_tokens[list_request.pageToken]
+      del self.list_page_tokens[list_request.pageToken]
+
+    result = storage.Objects(
+        items=matching_files[range_start:range_start + items_per_page])
+    if range_start + items_per_page < len(matching_files):
+      next_range_start = range_start + items_per_page
+      next_page_token = '_page_token_%s_%s_%d' % (
+          bucket, prefix, next_range_start)
+      self.list_page_tokens[next_page_token] = next_range_start
+      result.nextPageToken = next_page_token
+    return result
+
+
+class FakeApiCall(object):
+  def __init__(self, exception, response):
+    self.exception = exception
+    self.is_error = exception is not None
+    # Response for Rewrite:
+    self.response = response
+
+
+class FakeBatchApiRequest(object):
+  def __init__(self, **unused_kwargs):
+    self.operations = []
+
+  def Add(self, service, method, request):  # pylint: disable=invalid-name
+    self.operations.append((service, method, request))
+
+  def Execute(self, unused_http, **unused_kwargs):  # pylint: disable=invalid-name
+    api_calls = []
+    for service, method, request in self.operations:
+      exception = None
+      response = None
+      try:
+        response = getattr(service, method)(request)
+      except Exception as e:  # pylint: disable=broad-except
+        exception = e
+      api_calls.append(FakeApiCall(exception, response))
+    return api_calls
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestGCSPathParser(unittest.TestCase):
 
   BAD_GCS_PATHS = [
@@ -200,36 +310,34 @@ class SampleOptions(object):
     self.dataflow_kms_key = kms_key
 
 
-@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+@mock.patch.multiple(
+    'time', time=mock.MagicMock(side_effect=range(100)), sleep=mock.MagicMock())
 class TestGCSIO(unittest.TestCase):
   def _insert_random_file(
       self,
       client,
       path,
-      size=0,
+      size,
+      generation=1,
       crc32c=None,
-      kms_key_name=None,
-      updated=None,
+      last_updated=None,
       fail_when_getting_metadata=False,
       fail_when_reading=False):
-    bucket_name, blob_name = gcsio.parse_gcs_path(path)
-    bucket = client.lookup_bucket(bucket_name)
-    blob = FakeBlob(
-        blob_name,
+    bucket, name = gcsio.parse_gcs_path(path)
+    f = FakeFile(
         bucket,
-        size=size,
-        contents=os.urandom(size),
+        name,
+        os.urandom(size),
+        generation,
         crc32c=crc32c,
-        kms_key_name=kms_key_name,
-        updated=updated,
-        fail_when_getting_metadata=fail_when_getting_metadata,
-        fail_when_reading=fail_when_reading)
-    return blob
+        last_updated=last_updated)
+    client.objects.add_file(f, fail_when_getting_metadata, fail_when_reading)
+    return f
 
   def setUp(self):
     self.client = FakeGcsClient()
     self.gcs = gcsio.GcsIO(self.client)
-    self.client.create_bucket("gcsio-test")
 
   def test_default_bucket_name(self):
     self.assertEqual(
@@ -243,6 +351,16 @@ class TestGCSIO(unittest.TestCase):
                 DEFAULT_GCP_PROJECT, "us-central1", kms_key="kmskey!")),
         None)
 
+  def test_num_retries(self):
+    # BEAM-7424: update num_retries accordingly if storage_client is
+    # regenerated.
+    self.assertEqual(gcsio.GcsIO().client.num_retries, 20)
+
+  def test_retry_func(self):
+    # BEAM-7667: update retry_func accordingly if storage_client is
+    # regenerated.
+    self.assertIsNotNone(gcsio.GcsIO().client.retry_func)
+
   def test_exists(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
@@ -250,16 +368,17 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(self.gcs.exists(file_name + 'xyz'))
     self.assertTrue(self.gcs.exists(file_name))
 
-  @mock.patch.object(FakeBucket, 'get_blob')
+  @mock.patch.object(FakeGcsObjects, 'Get')
   def test_exists_failure(self, mock_get):
     # Raising an error other than 404. Raising 404 is a valid failure for
     # exists() call.
-    mock_get.side_effect = BadRequest("Try again")
+    mock_get.side_effect = HttpError({'status': 400}, None, None)
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
     self._insert_random_file(self.client, file_name, file_size)
-    with self.assertRaises(BadRequest):
+    with self.assertRaises(HttpError) as cm:
       self.gcs.exists(file_name)
+    self.assertEqual(400, cm.exception.status_code)
 
   def test_checksum(self):
     file_name = 'gs://gcsio-test/dummy_file'
@@ -277,100 +396,179 @@ class TestGCSIO(unittest.TestCase):
     self.assertTrue(self.gcs.exists(file_name))
     self.assertEqual(1234, self.gcs.size(file_name))
 
-  def test_kms_key(self):
-    file_name = 'gs://gcsio-test/dummy_file'
-    file_size = 1234
-    kms_key_name = "dummy"
-
-    self._insert_random_file(
-        self.client, file_name, file_size, kms_key_name=kms_key_name)
-    self.assertTrue(self.gcs.exists(file_name))
-    self.assertEqual(kms_key_name, self.gcs.kms_key(file_name))
-
   def test_last_updated(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
-    updated = datetime.fromtimestamp(123456.78)
+    last_updated = 123456.78
 
-    self._insert_random_file(self.client, file_name, file_size, updated=updated)
+    self._insert_random_file(
+        self.client, file_name, file_size, last_updated=last_updated)
     self.assertTrue(self.gcs.exists(file_name))
-    self.assertEqual(
-        gcsio.GcsIO._updated_to_seconds(updated),
-        self.gcs.last_updated(file_name))
+    self.assertEqual(last_updated, self.gcs.last_updated(file_name))
 
   def test_file_status(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
-    updated = datetime.fromtimestamp(123456.78)
+    last_updated = 123456.78
     checksum = 'deadbeef'
 
     self._insert_random_file(
-        self.client, file_name, file_size, updated=updated, crc32c=checksum)
+        self.client,
+        file_name,
+        file_size,
+        last_updated=last_updated,
+        crc32c=checksum)
     file_checksum = self.gcs.checksum(file_name)
 
     file_status = self.gcs._status(file_name)
 
     self.assertEqual(file_status['size'], file_size)
     self.assertEqual(file_status['checksum'], file_checksum)
-    self.assertEqual(
-        file_status['updated'], gcsio.GcsIO._updated_to_seconds(updated))
+    self.assertEqual(file_status['last_updated'], last_updated)
 
-  def test_file_mode_calls(self):
+  def test_file_mode(self):
     file_name = 'gs://gcsio-test/dummy_mode_file'
-    self._insert_random_file(self.client, file_name)
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobWriter') as writer:
-      self.gcs.open(file_name, 'wb')
-      writer.assert_called()
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader:
-      self.gcs.open(file_name, 'rb')
-      reader.assert_called()
+    with self.gcs.open(file_name, 'wb') as f:
+      assert f.mode == 'wb'
+    with self.gcs.open(file_name, 'rb') as f:
+      assert f.mode == 'rb'
 
   def test_bad_file_modes(self):
     file_name = 'gs://gcsio-test/dummy_mode_file'
-    self._insert_random_file(self.client, file_name)
     with self.assertRaises(ValueError):
       self.gcs.open(file_name, 'w+')
     with self.assertRaises(ValueError):
       self.gcs.open(file_name, 'r+b')
 
+  def test_empty_batches(self):
+    self.assertEqual([], self.gcs.copy_batch([]))
+    self.assertEqual([], self.gcs.delete_batch([]))
+
   def test_delete(self):
     file_name = 'gs://gcsio-test/delete_me'
     file_size = 1024
-    bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
+
     # Test deletion of non-existent file.
-    bucket = self.client.get_bucket(bucket_name)
     self.gcs.delete(file_name)
 
     self._insert_random_file(self.client, file_name, file_size)
-    self.assertTrue(blob_name in bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
     self.gcs.delete(file_name)
 
-    self.assertFalse(blob_name in bucket.blobs)
+    self.assertFalse(
+        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
+
+  @mock.patch(
+      'apache_beam.io.gcp.gcsio.auth.get_service_credentials',
+      wraps=lambda pipeline_options: None)
+  @mock.patch('apache_beam.io.gcp.gcsio.get_new_http')
+  def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock):
+    client = gcsio.GcsIO()
+    try:
+      client.get_bucket('mabucket')
+    except:  # pylint: disable=bare-except
+      # Ignore errors. The errors come from the fact that we did not mock
+      # the response from the API, so the overall get_bucket call fails
+      # soon after the GCS API is called.
+      pass
+    call = get_new_http_mock.return_value.request.mock_calls[-2]
+    self.assertIn(
+        "apache-beam/%s (GPN:Beam)" % apache_beam.__version__,
+        call[2]['headers']['User-Agent'])
+
+  @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
+  def test_delete_batch(self, *unused_args):
+    gcsio.BatchApiRequest = FakeBatchApiRequest
+    file_name_pattern = 'gs://gcsio-test/delete_me_%d'
+    file_size = 1024
+    num_files = 10
+
+    # Test deletion of non-existent files.
+    result = self.gcs.delete_batch(
+        [file_name_pattern % i for i in range(num_files)])
+    self.assertTrue(result)
+    for i, (file_name, exception) in enumerate(result):
+      self.assertEqual(file_name, file_name_pattern % i)
+      self.assertEqual(exception, None)
+      self.assertFalse(self.gcs.exists(file_name_pattern % i))
+
+    # Insert some files.
+    for i in range(num_files):
+      self._insert_random_file(self.client, file_name_pattern % i, file_size)
+
+    # Check files inserted properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(file_name_pattern % i))
+
+    # Execute batch delete.
+    self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)])
+
+    # Check files deleted properly.
+    for i in range(num_files):
+      self.assertFalse(self.gcs.exists(file_name_pattern % i))
 
   def test_copy(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
-    src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-    dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
-    src_bucket = self.client.lookup_bucket(src_bucket_name)
-    dest_bucket = self.client.lookup_bucket(dest_bucket_name)
     file_size = 1024
     self._insert_random_file(self.client, src_file_name, file_size)
-    self.assertTrue(src_blob_name in src_bucket.blobs)
-    self.assertFalse(dest_blob_name in dest_bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
-    self.gcs.copy(src_file_name, dest_file_name)
+    self.gcs.copy(src_file_name, dest_file_name, dest_kms_key_name='kms_key')
 
-    self.assertTrue(src_blob_name in src_bucket.blobs)
-    self.assertTrue(dest_blob_name in dest_bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     # Test copy of non-existent files.
-    with self.assertRaises(NotFound):
+    with self.assertRaisesRegex(HttpError, r'Not Found'):
       self.gcs.copy(
           'gs://gcsio-test/non-existent',
           'gs://gcsio-test/non-existent-destination')
 
+  @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
+  def test_copy_batch(self, *unused_args):
+    gcsio.BatchApiRequest = FakeBatchApiRequest
+    from_name_pattern = 'gs://gcsio-test/copy_me_%d'
+    to_name_pattern = 'gs://gcsio-test/destination_%d'
+    file_size = 1024
+    num_files = 10
+
+    result = self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
+                                  for i in range(num_files)],
+                                 dest_kms_key_name='kms_key')
+    self.assertTrue(result)
+    for i, (src, dest, exception) in enumerate(result):
+      self.assertEqual(src, from_name_pattern % i)
+      self.assertEqual(dest, to_name_pattern % i)
+      self.assertTrue(isinstance(exception, IOError))
+      self.assertEqual(exception.errno, errno.ENOENT)
+      self.assertFalse(self.gcs.exists(from_name_pattern % i))
+      self.assertFalse(self.gcs.exists(to_name_pattern % i))
+
+    # Insert some files.
+    for i in range(num_files):
+      self._insert_random_file(self.client, from_name_pattern % i, file_size)
+
+    # Check files inserted properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(from_name_pattern % i))
+
+    # Execute batch copy.
+    self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
+                         for i in range(num_files)])
+
+    # Check files copied properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(from_name_pattern % i))
+      self.assertTrue(self.gcs.exists(to_name_pattern % i))
+
   def test_copytree(self):
     src_dir_name = 'gs://gcsio-test/source/'
     dest_dir_name = 'gs://gcsio-test/dest/'
@@ -379,62 +577,204 @@ class TestGCSIO(unittest.TestCase):
     for path in paths:
       src_file_name = src_dir_name + path
       dest_file_name = dest_dir_name + path
-      src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-      dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
-      src_bucket = self.client.lookup_bucket(src_bucket_name)
-      dest_bucket = self.client.lookup_bucket(dest_bucket_name)
-      file_size = 1024
       self._insert_random_file(self.client, src_file_name, file_size)
-      self.assertTrue(src_blob_name in src_bucket.blobs)
-      self.assertFalse(dest_blob_name in dest_bucket.blobs)
+      self.assertTrue(
+          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+      self.assertFalse(
+          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.copytree(src_dir_name, dest_dir_name)
 
     for path in paths:
       src_file_name = src_dir_name + path
       dest_file_name = dest_dir_name + path
-      src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-      dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
-      src_bucket = self.client.lookup_bucket(src_bucket_name)
-      dest_bucket = self.client.lookup_bucket(dest_bucket_name)
-      self.assertTrue(src_blob_name in src_bucket.blobs)
-      self.assertTrue(dest_blob_name in dest_bucket.blobs)
+      self.assertTrue(
+          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+      self.assertTrue(
+          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
   def test_rename(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
-    src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-    dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
     file_size = 1024
     self._insert_random_file(self.client, src_file_name, file_size)
-    src_bucket = self.client.lookup_bucket(src_bucket_name)
-    dest_bucket = self.client.lookup_bucket(dest_bucket_name)
-    self.assertTrue(src_blob_name in src_bucket.blobs)
-    self.assertFalse(dest_blob_name in dest_bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.rename(src_file_name, dest_file_name)
 
-    self.assertFalse(src_blob_name in src_bucket.blobs)
-    self.assertTrue(dest_blob_name in dest_bucket.blobs)
+    self.assertFalse(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
+
+  def test_full_file_read(self):
+    file_name = 'gs://gcsio-test/full_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    f = self.gcs.open(file_name)
+    self.assertEqual(f.mode, 'r')
+    f.seek(0, os.SEEK_END)
+    self.assertEqual(f.tell(), file_size)
+    self.assertEqual(f.read(), b'')
+    f.seek(0)
+    self.assertEqual(f.read(), random_file.contents)
+
+  def test_file_random_seek(self):
+    file_name = 'gs://gcsio-test/seek_file'
+    file_size = 5 * 1024 * 1024 - 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+
+    f = self.gcs.open(file_name)
+    random.seed(0)
+    for _ in range(0, 10):
+      a = random.randint(0, file_size - 1)
+      b = random.randint(0, file_size - 1)
+      start, end = min(a, b), max(a, b)
+      f.seek(start)
+      self.assertEqual(f.tell(), start)
+      self.assertEqual(
+          f.read(end - start + 1), random_file.contents[start:end + 1])
+      self.assertEqual(f.tell(), end + 1)
+
+  def test_file_iterator(self):
+    file_name = 'gs://gcsio-test/iterating_file'
+    lines = []
+    line_count = 10
+    for _ in range(line_count):
+      line_length = random.randint(100, 500)
+      line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
+      lines.append(line)
+
+    contents = b''.join(lines)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
+
+    f = self.gcs.open(file_name)
+
+    read_lines = 0
+    for line in f:
+      read_lines += 1
 
-  def test_file_buffered_read_call(self):
+    self.assertEqual(read_lines, line_count)
+
+  def test_file_read_line(self):
     file_name = 'gs://gcsio-test/read_line_file'
+    lines = []
+
+    # Set a small buffer size to exercise refilling the buffer.
+    # First line is carefully crafted so the newline falls as the last character
+    # of the buffer to exercise this code path.
     read_buffer_size = 1024
-    self._insert_random_file(self.client, file_name, 10240)
+    lines.append(b'x' * 1023 + b'\n')
+
+    for _ in range(1, 1000):
+      line_length = random.randint(100, 500)
+      line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
+      lines.append(line)
+    contents = b''.join(lines)
+
+    file_size = len(contents)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
+
+    f = self.gcs.open(file_name, read_buffer_size=read_buffer_size)
+
+    # Test read of first two lines.
+    f.seek(0)
+    self.assertEqual(f.readline(), lines[0])
+    self.assertEqual(f.tell(), len(lines[0]))
+    self.assertEqual(f.readline(), lines[1])
+
+    # Test read at line boundary.
+    f.seek(file_size - len(lines[-1]) - 1)
+    self.assertEqual(f.readline(), b'\n')
+
+    # Test read at end of file.
+    f.seek(file_size)
+    self.assertEqual(f.readline(), b'')
+
+    # Test reads at random positions.
+    random.seed(0)
+    for _ in range(0, 10):
+      start = random.randint(0, file_size - 1)
+      line_index = 0
+      # Find line corresponding to start index.
+      chars_left = start
+      while True:
+        next_line_length = len(lines[line_index])
+        if chars_left - next_line_length < 0:
+          break
+        chars_left -= next_line_length
+        line_index += 1
+      f.seek(start)
+      self.assertEqual(f.readline(), lines[line_index][chars_left:])
+
+  def test_file_write(self):
+    file_name = 'gs://gcsio-test/write_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents[0:1000])
+    f.write(contents[1000:1024 * 1024])
+    f.write(contents[1024 * 1024:])
+    f.close()
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
+
+  def test_file_close(self):
+    file_name = 'gs://gcsio-test/close_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents)
+    f.close()
+    f.close()  # This should not crash.
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
+
+  def test_file_flush(self):
+    file_name = 'gs://gcsio-test/flush_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents[0:1000])
+    f.flush()
+    f.write(contents[1000:1024 * 1024])
+    f.flush()
+    f.flush()  # Should be a NOOP.
+    f.write(contents[1024 * 1024:])
+    f.close()  # This should already call the equivalent of flush() in its body.
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
 
-    bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
-    bucket = self.client.get_bucket(bucket_name)
-    blob = bucket.get_blob(blob_name)
+  def test_context_manager(self):
+    # Test writing with a context manager.
+    file_name = 'gs://gcsio-test/context_manager_file'
+    file_size = 1024
+    contents = os.urandom(file_size)
+    with self.gcs.open(file_name, 'w') as f:
+      f.write(contents)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
 
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader:
-      self.gcs.open(file_name, read_buffer_size=read_buffer_size)
-      reader.assert_called_with(blob, chunk_size=read_buffer_size)
+    # Test reading with a context manager.
+    with self.gcs.open(file_name) as f:
+      self.assertEqual(f.read(), contents)
 
-  def test_file_write_call(self):
-    file_name = 'gs://gcsio-test/write_file'
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobWriter') as writer:
-      self.gcs.open(file_name, 'w')
-      writer.assert_called()
+    # Test that exceptions are not swallowed by the context manager.
+    with self.assertRaises(ZeroDivisionError):
+      with self.gcs.open(file_name) as f:
+        f.read(0 // 0)
 
   def test_list_prefix(self):
     bucket_name = 'gcsio-test'
@@ -472,11 +812,141 @@ class TestGCSIO(unittest.TestCase):
           set(self.gcs.list_prefix(file_pattern).items()),
           set(expected_file_names))
 
+  def test_mime_binary_encoding(self):
+    # This test verifies that the MIME email_generator library works properly
+    # and does not corrupt '\r\n' during uploads (the patch to apitools in
+    # Python 3 is applied in io/gcp/__init__.py).
+    from apitools.base.py.transfer import email_generator
+    generator_cls = email_generator.BytesGenerator
+    output_buffer = io.BytesIO()
+    generator = generator_cls(output_buffer)
+    test_msg = 'a\nb\r\nc\n\r\n\n\nd'
+    message = Message()
+    message.set_payload(test_msg)
+    generator._handle_text(message)
+    self.assertEqual(test_msg.encode('ascii'), output_buffer.getvalue())
+
+  def test_downloader_monitoring_info(self):
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    self.gcs.open(file_name, 'r')
+
+    resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(DEFAULT_PROJECT_NUMBER),
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+
+    metric_name = MetricName(
+        None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 2)
+
+  @mock.patch.object(FakeGcsBuckets, 'Get')
+  def test_downloader_fail_to_get_project_number(self, mock_get):
+    # Raising an error when listing GCS Bucket so that project number fails to
+    # be retrieved.
+    mock_get.side_effect = HttpError({'status': 403}, None, None)
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    self.gcs.open(file_name, 'r')
+
+    resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(DEFAULT_PROJECT_NUMBER),
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+
+    metric_name = MetricName(
+        None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 0)
+
+    labels_without_project_id = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+    metric_name = MetricName(
+        None,
+        None,
+        urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        labels=labels_without_project_id)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 2)
+
   def test_downloader_fail_non_existent_object(self):
     file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
-    with self.assertRaises(NotFound):
+    with self.assertRaises(IOError):
+      self.gcs.open(file_name, 'r')
+
+  def test_downloader_fail_when_getting_metadata(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    self._insert_random_file(
+        self.client, file_name, file_size, fail_when_getting_metadata=True)
+    with self.assertRaises(HttpError):
+      self.gcs.open(file_name, 'r')
+
+  def test_downloader_fail_when_reading(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    self._insert_random_file(
+        self.client, file_name, file_size, fail_when_reading=True)
+    with self.assertRaises(HttpError):
       self.gcs.open(file_name, 'r')
 
+  def test_uploader_monitoring_info(self):
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    f = self.gcs.open(file_name, 'w')
+
+    resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.insert',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(DEFAULT_PROJECT_NUMBER),
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+
+    f.close()
+    metric_name = MetricName(
+        None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 1)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
new file mode 100644
index 00000000000..b37a4b57c11
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Common imports for generated storage client library."""
+# pylint:disable=wildcard-import
+
+import pkgutil
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import *
+  from apache_beam.io.gcp.internal.clients.storage.storage_v1_client import *
+  from apache_beam.io.gcp.internal.clients.storage.storage_v1_messages import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
+
+__path__ = pkgutil.extend_path(__path__, __name__)  # type: ignore
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
new file mode 100644
index 00000000000..e5b7c0268ec
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
@@ -0,0 +1,1517 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Generated client library for storage version v1."""
+# NOTE: This file is autogenerated and should not be edited by hand.
+
+from apitools.base.py import base_api
+
+from apache_beam.io.gcp.gcsio_overrides import GcsIOOverrides
+from apache_beam.io.gcp.internal.clients.storage import \
+    storage_v1_messages as messages
+
+
+class StorageV1(base_api.BaseApiClient):
+  """Generated client library for service storage version v1."""
+
+  MESSAGES_MODULE = messages
+  BASE_URL = u'https://www.googleapis.com/storage/v1/'
+
+  _PACKAGE = u'storage'
+  _SCOPES = [
+      u'https://www.googleapis.com/auth/cloud-platform',
+      u'https://www.googleapis.com/auth/cloud-platform.read-only',
+      u'https://www.googleapis.com/auth/devstorage.full_control',
+      u'https://www.googleapis.com/auth/devstorage.read_only',
+      u'https://www.googleapis.com/auth/devstorage.read_write'
+  ]
+  _VERSION = u'v1'
+  _CLIENT_ID = '1042881264118.apps.googleusercontent.com'
+  _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b'
+  _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b'
+  _CLIENT_CLASS_NAME = u'StorageV1'
+  _URL_VERSION = u'v1'
+  _API_KEY = None
+
+  def __init__(
+      self,
+      url='',
+      credentials=None,
+      get_credentials=True,
+      http=None,
+      model=None,
+      log_request=False,
+      log_response=False,
+      credentials_args=None,
+      default_global_params=None,
+      additional_http_headers=None,
+      response_encoding=None):
+    """Create a new storage handle."""
+    url = url or self.BASE_URL
+    super().__init__(
+        url,
+        credentials=credentials,
+        get_credentials=get_credentials,
+        http=http,
+        model=model,
+        log_request=log_request,
+        log_response=log_response,
+        num_retries=20,
+        credentials_args=credentials_args,
+        default_global_params=default_global_params,
+        additional_http_headers=additional_http_headers,
+        retry_func=GcsIOOverrides.retry_func,
+        response_encoding=response_encoding)
+    self.bucketAccessControls = self.BucketAccessControlsService(self)
+    self.buckets = self.BucketsService(self)
+    self.channels = self.ChannelsService(self)
+    self.defaultObjectAccessControls = self.DefaultObjectAccessControlsService(
+        self)
+    self.notifications = self.NotificationsService(self)
+    self.objectAccessControls = self.ObjectAccessControlsService(self)
+    self.objects = self.ObjectsService(self)
+    self.projects_serviceAccount = self.ProjectsServiceAccountService(self)
+    self.projects = self.ProjectsService(self)
+
+  class BucketAccessControlsService(base_api.BaseApiService):
+    """Service class for the bucketAccessControls resource."""
+
+    _NAME = u'bucketAccessControls'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes the ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageBucketAccessControlsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.bucketAccessControls.delete',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageBucketAccessControlsDeleteRequest',
+        response_type_name=u'StorageBucketAccessControlsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns the ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.bucketAccessControls.get',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageBucketAccessControlsGetRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.bucketAccessControls.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl',
+        request_field=u'bucketAccessControl',
+        request_type_name=u'StorageBucketAccessControlsInsertRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves ACL entries on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControls) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.bucketAccessControls.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl',
+        request_field='',
+        request_type_name=u'StorageBucketAccessControlsListRequest',
+        response_type_name=u'BucketAccessControls',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches an ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.bucketAccessControls.patch',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field=u'bucketAccessControl',
+        request_type_name=u'StorageBucketAccessControlsPatchRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates an ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.bucketAccessControls.update',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field=u'bucketAccessControl',
+        request_type_name=u'StorageBucketAccessControlsUpdateRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+  class BucketsService(base_api.BaseApiService):
+    """Service class for the buckets resource."""
+
+    _NAME = u'buckets'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes an empty bucket.
+
+      Args:
+        request: (StorageBucketsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageBucketsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.buckets.delete',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=
+        [u'ifMetagenerationMatch', u'ifMetagenerationNotMatch', u'userProject'],
+        relative_path=u'b/{bucket}',
+        request_field='',
+        request_type_name=u'StorageBucketsDeleteRequest',
+        response_type_name=u'StorageBucketsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns metadata for the specified bucket.
+
+      Args:
+        request: (StorageBucketsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.get',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}',
+        request_field='',
+        request_type_name=u'StorageBucketsGetRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def GetIamPolicy(self, request, global_params=None):
+      r"""Returns an IAM policy for the specified bucket.
+
+      Args:
+        request: (StorageBucketsGetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('GetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    GetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.getIamPolicy',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/iam',
+        request_field='',
+        request_type_name=u'StorageBucketsGetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new bucket.
+
+      Args:
+        request: (StorageBucketsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.buckets.insert',
+        ordered_params=[u'project'],
+        path_params=[],
+        query_params=[
+            u'predefinedAcl',
+            u'predefinedDefaultObjectAcl',
+            u'project',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b',
+        request_field=u'bucket',
+        request_type_name=u'StorageBucketsInsertRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves a list of buckets for a given project.
+
+      Args:
+        request: (StorageBucketsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Buckets) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.list',
+        ordered_params=[u'project'],
+        path_params=[],
+        query_params=[
+            u'maxResults',
+            u'pageToken',
+            u'prefix',
+            u'project',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b',
+        request_field='',
+        request_type_name=u'StorageBucketsListRequest',
+        response_type_name=u'Buckets',
+        supports_download=False,
+    )
+
+    def LockRetentionPolicy(self, request, global_params=None):
+      r"""Locks retention policy on a bucket.
+
+      Args:
+        request: (StorageBucketsLockRetentionPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('LockRetentionPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    LockRetentionPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.buckets.lockRetentionPolicy',
+        ordered_params=[u'bucket', u'ifMetagenerationMatch'],
+        path_params=[u'bucket'],
+        query_params=[u'ifMetagenerationMatch', u'userProject'],
+        relative_path=u'b/{bucket}/lockRetentionPolicy',
+        request_field='',
+        request_type_name=u'StorageBucketsLockRetentionPolicyRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches a bucket. Changes to the bucket will be readable immediately after writing, but configuration changes may take time to propagate.
+
+      Args:
+        request: (StorageBucketsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.buckets.patch',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'predefinedDefaultObjectAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}',
+        request_field=u'bucketResource',
+        request_type_name=u'StorageBucketsPatchRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def SetIamPolicy(self, request, global_params=None):
+      r"""Updates an IAM policy for the specified bucket.
+
+      Args:
+        request: (StorageBucketsSetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('SetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    SetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.buckets.setIamPolicy',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/iam',
+        request_field=u'policy',
+        request_type_name=u'StorageBucketsSetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def TestIamPermissions(self, request, global_params=None):
+      r"""Tests a set of permissions on the given bucket to see which, if any, are held by the caller.
+
+      Args:
+        request: (StorageBucketsTestIamPermissionsRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (TestIamPermissionsResponse) The response message.
+      """
+      config = self.GetMethodConfig('TestIamPermissions')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    TestIamPermissions.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.testIamPermissions',
+        ordered_params=[u'bucket', u'permissions'],
+        path_params=[u'bucket'],
+        query_params=[u'permissions', u'userProject'],
+        relative_path=u'b/{bucket}/iam/testPermissions',
+        request_field='',
+        request_type_name=u'StorageBucketsTestIamPermissionsRequest',
+        response_type_name=u'TestIamPermissionsResponse',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates a bucket. Changes to the bucket will be readable immediately after writing, but configuration changes may take time to propagate.
+
+      Args:
+        request: (StorageBucketsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.buckets.update',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'predefinedDefaultObjectAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}',
+        request_field=u'bucketResource',
+        request_type_name=u'StorageBucketsUpdateRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+  class ChannelsService(base_api.BaseApiService):
+    """Service class for the channels resource."""
+
+    _NAME = u'channels'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Stop(self, request, global_params=None):
+      r"""Stop watching resources through this channel.
+
+      Args:
+        request: (Channel) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageChannelsStopResponse) The response message.
+      """
+      config = self.GetMethodConfig('Stop')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Stop.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.channels.stop',
+        ordered_params=[],
+        path_params=[],
+        query_params=[],
+        relative_path=u'channels/stop',
+        request_field='<request>',
+        request_type_name=u'Channel',
+        response_type_name=u'StorageChannelsStopResponse',
+        supports_download=False,
+    )
+
+  class DefaultObjectAccessControlsService(base_api.BaseApiService):
+    """Service class for the defaultObjectAccessControls resource."""
+
+    _NAME = u'defaultObjectAccessControls'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes the default object ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageDefaultObjectAccessControlsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.defaultObjectAccessControls.delete',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field='',
+        request_type_name=u'StorageDefaultObjectAccessControlsDeleteRequest',
+        response_type_name=u'StorageDefaultObjectAccessControlsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns the default object ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.defaultObjectAccessControls.get',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field='',
+        request_type_name=u'StorageDefaultObjectAccessControlsGetRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new default object ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.defaultObjectAccessControls.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageDefaultObjectAccessControlsInsertRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves default object ACL entries on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControls) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.defaultObjectAccessControls.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=
+        [u'ifMetagenerationMatch', u'ifMetagenerationNotMatch', u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl',
+        request_field='',
+        request_type_name=u'StorageDefaultObjectAccessControlsListRequest',
+        response_type_name=u'ObjectAccessControls',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches a default object ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.defaultObjectAccessControls.patch',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageDefaultObjectAccessControlsPatchRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates a default object ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.defaultObjectAccessControls.update',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageDefaultObjectAccessControlsUpdateRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+  class NotificationsService(base_api.BaseApiService):
+    """Service class for the notifications resource."""
+
+    _NAME = u'notifications'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes a notification subscription.
+
+      Args:
+        request: (StorageNotificationsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageNotificationsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.notifications.delete',
+        ordered_params=[u'bucket', u'notification'],
+        path_params=[u'bucket', u'notification'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs/{notification}',
+        request_field='',
+        request_type_name=u'StorageNotificationsDeleteRequest',
+        response_type_name=u'StorageNotificationsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""View a notification configuration.
+
+      Args:
+        request: (StorageNotificationsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Notification) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.notifications.get',
+        ordered_params=[u'bucket', u'notification'],
+        path_params=[u'bucket', u'notification'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs/{notification}',
+        request_field='',
+        request_type_name=u'StorageNotificationsGetRequest',
+        response_type_name=u'Notification',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a notification subscription for a given bucket.
+
+      Args:
+        request: (StorageNotificationsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Notification) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.notifications.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs',
+        request_field=u'notification',
+        request_type_name=u'StorageNotificationsInsertRequest',
+        response_type_name=u'Notification',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves a list of notification subscriptions for a given bucket.
+
+      Args:
+        request: (StorageNotificationsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Notifications) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.notifications.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs',
+        request_field='',
+        request_type_name=u'StorageNotificationsListRequest',
+        response_type_name=u'Notifications',
+        supports_download=False,
+    )
+
+  class ObjectAccessControlsService(base_api.BaseApiService):
+    """Service class for the objectAccessControls resource."""
+
+    _NAME = u'objectAccessControls'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes the ACL entry for the specified entity on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageObjectAccessControlsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.objectAccessControls.delete',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageObjectAccessControlsDeleteRequest',
+        response_type_name=u'StorageObjectAccessControlsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns the ACL entry for the specified entity on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objectAccessControls.get',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageObjectAccessControlsGetRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new ACL entry on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objectAccessControls.insert',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageObjectAccessControlsInsertRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves ACL entries on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControls) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objectAccessControls.list',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl',
+        request_field='',
+        request_type_name=u'StorageObjectAccessControlsListRequest',
+        response_type_name=u'ObjectAccessControls',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches an ACL entry on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.objectAccessControls.patch',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageObjectAccessControlsPatchRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates an ACL entry on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.objectAccessControls.update',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageObjectAccessControlsUpdateRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+  class ObjectsService(base_api.BaseApiService):
+    """Service class for the objects resource."""
+
+    _NAME = u'objects'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {
+          'Insert': base_api.ApiUploadInfo(
+              accept=['*/*'],
+              max_size=None,
+              resumable_multipart=True,
+              resumable_path=u'/resumable/upload/storage/v1/b/{bucket}/o',
+              simple_multipart=True,
+              simple_path=u'/upload/storage/v1/b/{bucket}/o',
+          ),
+      }
+
+    def Compose(self, request, global_params=None):
+      r"""Concatenates a list of existing objects into a new object in the same bucket.
+
+      Args:
+        request: (StorageObjectsComposeRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Compose')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Compose.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.compose',
+        ordered_params=[u'destinationBucket', u'destinationObject'],
+        path_params=[u'destinationBucket', u'destinationObject'],
+        query_params=[
+            u'destinationPredefinedAcl',
+            u'ifGenerationMatch',
+            u'ifMetagenerationMatch',
+            u'kmsKeyName',
+            u'userProject'
+        ],
+        relative_path=u'b/{destinationBucket}/o/{destinationObject}/compose',
+        request_field=u'composeRequest',
+        request_type_name=u'StorageObjectsComposeRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def Copy(self, request, global_params=None):
+      r"""Copies a source object to a destination object. Optionally overrides metadata.
+
+      Args:
+        request: (StorageObjectsCopyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Copy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Copy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.copy',
+        ordered_params=[
+            u'sourceBucket',
+            u'sourceObject',
+            u'destinationBucket',
+            u'destinationObject'
+        ],
+        path_params=[
+            u'destinationBucket',
+            u'destinationObject',
+            u'sourceBucket',
+            u'sourceObject'
+        ],
+        query_params=[
+            u'destinationPredefinedAcl',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'ifSourceGenerationMatch',
+            u'ifSourceGenerationNotMatch',
+            u'ifSourceMetagenerationMatch',
+            u'ifSourceMetagenerationNotMatch',
+            u'projection',
+            u'sourceGeneration',
+            u'userProject'
+        ],
+        relative_path=
+        u'b/{sourceBucket}/o/{sourceObject}/copyTo/b/{destinationBucket}/o/{destinationObject}',
+        request_field=u'object',
+        request_type_name=u'StorageObjectsCopyRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def Delete(self, request, global_params=None):
+      r"""Deletes an object and its metadata. Deletions are permanent if versioning is not enabled for the bucket, or if the generation parameter is used.
+
+      Args:
+        request: (StorageObjectsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageObjectsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.objects.delete',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field='',
+        request_type_name=u'StorageObjectsDeleteRequest',
+        response_type_name=u'StorageObjectsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None, download=None):
+      r"""Retrieves an object or its metadata.
+
+      Args:
+        request: (StorageObjectsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+        download: (Download, default: None) If present, download
+            data from the request via this stream.
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(
+          config, request, global_params=global_params, download=download)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.get',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field='',
+        request_type_name=u'StorageObjectsGetRequest',
+        response_type_name=u'Object',
+        supports_download=True,
+    )
+
+    def GetIamPolicy(self, request, global_params=None):
+      r"""Returns an IAM policy for the specified object.
+
+      Args:
+        request: (StorageObjectsGetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('GetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    GetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.getIamPolicy',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/iam',
+        request_field='',
+        request_type_name=u'StorageObjectsGetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None, upload=None):
+      r"""Stores a new object and metadata.
+
+      Args:
+        request: (StorageObjectsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+        upload: (Upload, default: None) If present, upload
+            this stream with the request.
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      upload_config = self.GetUploadConfig('Insert')
+      return self._RunMethod(
+          config,
+          request,
+          global_params=global_params,
+          upload=upload,
+          upload_config=upload_config)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'contentEncoding',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'kmsKeyName',
+            u'name',
+            u'predefinedAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o',
+        request_field=u'object',
+        request_type_name=u'StorageObjectsInsertRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves a list of objects matching the criteria.
+
+      Args:
+        request: (StorageObjectsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Objects) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'delimiter',
+            u'includeTrailingDelimiter',
+            u'maxResults',
+            u'pageToken',
+            u'prefix',
+            u'projection',
+            u'userProject',
+            u'versions'
+        ],
+        relative_path=u'b/{bucket}/o',
+        request_field='',
+        request_type_name=u'StorageObjectsListRequest',
+        response_type_name=u'Objects',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches an object's metadata.
+
+      Args:
+        request: (StorageObjectsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.objects.patch',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field=u'objectResource',
+        request_type_name=u'StorageObjectsPatchRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def Rewrite(self, request, global_params=None):
+      r"""Rewrites a source object to a destination object. Optionally overrides metadata.
+
+      Args:
+        request: (StorageObjectsRewriteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (RewriteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Rewrite')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Rewrite.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.rewrite',
+        ordered_params=[
+            u'sourceBucket',
+            u'sourceObject',
+            u'destinationBucket',
+            u'destinationObject'
+        ],
+        path_params=[
+            u'destinationBucket',
+            u'destinationObject',
+            u'sourceBucket',
+            u'sourceObject'
+        ],
+        query_params=[
+            u'destinationKmsKeyName',
+            u'destinationPredefinedAcl',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'ifSourceGenerationMatch',
+            u'ifSourceGenerationNotMatch',
+            u'ifSourceMetagenerationMatch',
+            u'ifSourceMetagenerationNotMatch',
+            u'maxBytesRewrittenPerCall',
+            u'projection',
+            u'rewriteToken',
+            u'sourceGeneration',
+            u'userProject'
+        ],
+        relative_path=
+        u'b/{sourceBucket}/o/{sourceObject}/rewriteTo/b/{destinationBucket}/o/{destinationObject}',
+        request_field=u'object',
+        request_type_name=u'StorageObjectsRewriteRequest',
+        response_type_name=u'RewriteResponse',
+        supports_download=False,
+    )
+
+    def SetIamPolicy(self, request, global_params=None):
+      r"""Updates an IAM policy for the specified object.
+
+      Args:
+        request: (StorageObjectsSetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('SetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    SetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.objects.setIamPolicy',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/iam',
+        request_field=u'policy',
+        request_type_name=u'StorageObjectsSetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def TestIamPermissions(self, request, global_params=None):
+      r"""Tests a set of permissions on the given object to see which, if any, are held by the caller.
+
+      Args:
+        request: (StorageObjectsTestIamPermissionsRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (TestIamPermissionsResponse) The response message.
+      """
+      config = self.GetMethodConfig('TestIamPermissions')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    TestIamPermissions.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.testIamPermissions',
+        ordered_params=[u'bucket', u'object', u'permissions'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'permissions', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/iam/testPermissions',
+        request_field='',
+        request_type_name=u'StorageObjectsTestIamPermissionsRequest',
+        response_type_name=u'TestIamPermissionsResponse',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates an object's metadata.
+
+      Args:
+        request: (StorageObjectsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.objects.update',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field=u'objectResource',
+        request_type_name=u'StorageObjectsUpdateRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def WatchAll(self, request, global_params=None):
+      r"""Watch for changes on all objects in a bucket.
+
+      Args:
+        request: (StorageObjectsWatchAllRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Channel) The response message.
+      """
+      config = self.GetMethodConfig('WatchAll')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    WatchAll.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.watchAll',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'delimiter',
+            u'includeTrailingDelimiter',
+            u'maxResults',
+            u'pageToken',
+            u'prefix',
+            u'projection',
+            u'userProject',
+            u'versions'
+        ],
+        relative_path=u'b/{bucket}/o/watch',
+        request_field=u'channel',
+        request_type_name=u'StorageObjectsWatchAllRequest',
+        response_type_name=u'Channel',
+        supports_download=False,
+    )
+
+  class ProjectsServiceAccountService(base_api.BaseApiService):
+    """Service class for the projects_serviceAccount resource."""
+
+    _NAME = u'projects_serviceAccount'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Get(self, request, global_params=None):
+      r"""Get the email address of this project's Google Cloud Storage service account.
+
+      Args:
+        request: (StorageProjectsServiceAccountGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ServiceAccount) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.projects.serviceAccount.get',
+        ordered_params=[u'projectId'],
+        path_params=[u'projectId'],
+        query_params=[u'userProject'],
+        relative_path=u'projects/{projectId}/serviceAccount',
+        request_field='',
+        request_type_name=u'StorageProjectsServiceAccountGetRequest',
+        response_type_name=u'ServiceAccount',
+        supports_download=False,
+    )
+
+  class ProjectsService(base_api.BaseApiService):
+    """Service class for the projects resource."""
+
+    _NAME = u'projects'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
new file mode 100644
index 00000000000..caef0eb4b03
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
@@ -0,0 +1,2714 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Generated message classes for storage version v1.
+
+Stores and retrieves potentially large, immutable data objects.
+"""
+# NOTE: This file is autogenerated and should not be edited by hand.
+
+from apitools.base.protorpclite import message_types as _message_types
+from apitools.base.protorpclite import messages as _messages
+from apitools.base.py import encoding, extra_types
+
+package = 'storage'
+
+
+class Bucket(_messages.Message):
+  r"""A bucket.
+
+  Messages:
+    BillingValue: The bucket's billing configuration.
+    CorsValueListEntry: A CorsValueListEntry object.
+    EncryptionValue: Encryption configuration for a bucket.
+    LabelsValue: User-provided labels, in key/value pairs.
+    LifecycleValue: The bucket's lifecycle configuration. See lifecycle
+      management for more information.
+    LoggingValue: The bucket's logging configuration, which defines the
+      destination bucket and optional name prefix for the current bucket's
+      logs.
+    OwnerValue: The owner of the bucket. This is always the project team's
+      owner group.
+    RetentionPolicyValue: The bucket's retention policy. The retention policy
+      enforces a minimum retention time for all objects contained in the
+      bucket, based on their creation time. Any attempt to overwrite or delete
+      objects younger than the retention period will result in a
+      PERMISSION_DENIED error. An unlocked retention policy can be modified or
+      removed from the bucket via a storage.buckets.update operation. A locked
+      retention policy cannot be removed or shortened in duration for the
+      lifetime of the bucket. Attempting to remove or decrease period of a
+      locked retention policy will result in a PERMISSION_DENIED error.
+    VersioningValue: The bucket's versioning configuration.
+    WebsiteValue: The bucket's website configuration, controlling how the
+      service behaves when accessing bucket contents as a web site. See the
+      Static Website Examples for more information.
+
+  Fields:
+    acl: Access controls on the bucket.
+    billing: The bucket's billing configuration.
+    cors: The bucket's Cross-Origin Resource Sharing (CORS) configuration.
+    defaultEventBasedHold: The default value for event-based hold on newly
+      created objects in this bucket. Event-based hold is a way to retain
+      objects indefinitely until an event occurs, signified by the hold's
+      release. After being released, such objects will be subject to bucket-
+      level retention (if any). One sample use case of this flag is for banks
+      to hold loan documents for at least 3 years after loan is paid in full.
+      Here, bucket-level retention is 3 years and the event is loan being paid
+      in full. In this example, these objects will be held intact for any
+      number of years until the event has occurred (event-based hold on the
+      object is released) and then 3 more years after that. That means
+      retention duration of the objects begins from the moment event-based
+      hold transitioned from true to false. Objects under event-based hold
+      cannot be deleted, overwritten or archived until the hold is removed.
+    defaultObjectAcl: Default access controls to apply to new objects when no
+      ACL is provided.
+    encryption: Encryption configuration for a bucket.
+    etag: HTTP 1.1 Entity tag for the bucket.
+    id: The ID of the bucket. For buckets, the id and name properties are the
+      same.
+    kind: The kind of item this is. For buckets, this is always
+      storage#bucket.
+    labels: User-provided labels, in key/value pairs.
+    lifecycle: The bucket's lifecycle configuration. See lifecycle management
+      for more information.
+    location: The location of the bucket. Object data for objects in the
+      bucket resides in physical storage within this region. Defaults to US.
+      See the developer's guide for the authoritative list.
+    logging: The bucket's logging configuration, which defines the destination
+      bucket and optional name prefix for the current bucket's logs.
+    metageneration: The metadata generation of this bucket.
+    name: The name of the bucket.
+    owner: The owner of the bucket. This is always the project team's owner
+      group.
+    projectNumber: The project number of the project the bucket belongs to.
+    retentionPolicy: The bucket's retention policy. The retention policy
+      enforces a minimum retention time for all objects contained in the
+      bucket, based on their creation time. Any attempt to overwrite or delete
+      objects younger than the retention period will result in a
+      PERMISSION_DENIED error. An unlocked retention policy can be modified or
+      removed from the bucket via a storage.buckets.update operation. A locked
+      retention policy cannot be removed or shortened in duration for the
+      lifetime of the bucket. Attempting to remove or decrease period of a
+      locked retention policy will result in a PERMISSION_DENIED error.
+    selfLink: The URI of this bucket.
+    storageClass: The bucket's default storage class, used whenever no
+      storageClass is specified for a newly-created object. This defines how
+      objects in the bucket are stored and determines the SLA and the cost of
+      storage. Values include MULTI_REGIONAL, REGIONAL, STANDARD, NEARLINE,
+      COLDLINE, and DURABLE_REDUCED_AVAILABILITY. If this value is not
+      specified when the bucket is created, it will default to STANDARD. For
+      more information, see storage classes.
+    timeCreated: The creation time of the bucket in RFC 3339 format.
+    updated: The modification time of the bucket in RFC 3339 format.
+    versioning: The bucket's versioning configuration.
+    website: The bucket's website configuration, controlling how the service
+      behaves when accessing bucket contents as a web site. See the Static
+      Website Examples for more information.
+  """
+  class BillingValue(_messages.Message):
+    r"""The bucket's billing configuration.
+
+    Fields:
+      requesterPays: When set to true, Requester Pays is enabled for this
+        bucket.
+    """
+
+    requesterPays = _messages.BooleanField(1)
+
+  class CorsValueListEntry(_messages.Message):
+    r"""A CorsValueListEntry object.
+
+    Fields:
+      maxAgeSeconds: The value, in seconds, to return in the  Access-Control-
+        Max-Age header used in preflight responses.
+      method: The list of HTTP methods on which to include CORS response
+        headers, (GET, OPTIONS, POST, etc) Note: "*" is permitted in the list
+        of methods, and means "any method".
+      origin: The list of Origins eligible to receive CORS response headers.
+        Note: "*" is permitted in the list of origins, and means "any Origin".
+      responseHeader: The list of HTTP headers other than the simple response
+        headers to give permission for the user-agent to share across domains.
+    """
+
+    maxAgeSeconds = _messages.IntegerField(1, variant=_messages.Variant.INT32)
+    method = _messages.StringField(2, repeated=True)
+    origin = _messages.StringField(3, repeated=True)
+    responseHeader = _messages.StringField(4, repeated=True)
+
+  class EncryptionValue(_messages.Message):
+    r"""Encryption configuration for a bucket.
+
+    Fields:
+      defaultKmsKeyName: A Cloud KMS key that will be used to encrypt objects
+        inserted into this bucket, if no encryption method is specified.
+    """
+
+    defaultKmsKeyName = _messages.StringField(1)
+
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class LabelsValue(_messages.Message):
+    r"""User-provided labels, in key/value pairs.
+
+    Messages:
+      AdditionalProperty: An additional property for a LabelsValue object.
+
+    Fields:
+      additionalProperties: An individual label entry.
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a LabelsValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  class LifecycleValue(_messages.Message):
+    r"""The bucket's lifecycle configuration. See lifecycle management for
+    more information.
+
+    Messages:
+      RuleValueListEntry: A RuleValueListEntry object.
+
+    Fields:
+      rule: A lifecycle management rule, which is made of an action to take
+        and the condition(s) under which the action will be taken.
+    """
+    class RuleValueListEntry(_messages.Message):
+      r"""A RuleValueListEntry object.
+
+      Messages:
+        ActionValue: The action to take.
+        ConditionValue: The condition(s) under which the action will be taken.
+
+      Fields:
+        action: The action to take.
+        condition: The condition(s) under which the action will be taken.
+      """
+      class ActionValue(_messages.Message):
+        r"""The action to take.
+
+        Fields:
+          storageClass: Target storage class. Required iff the type of the
+            action is SetStorageClass.
+          type: Type of the action. Currently, only Delete and SetStorageClass
+            are supported.
+        """
+
+        storageClass = _messages.StringField(1)
+        type = _messages.StringField(2)
+
+      class ConditionValue(_messages.Message):
+        r"""The condition(s) under which the action will be taken.
+
+        Fields:
+          age: Age of an object (in days). This condition is satisfied when an
+            object reaches the specified age.
+          createdBefore: A date in RFC 3339 format with only the date part
+            (for instance, "2013-01-15"). This condition is satisfied when an
+            object is created before midnight of the specified date in UTC.
+          isLive: Relevant only for versioned objects. If the value is true,
+            this condition matches live objects; if the value is false, it
+            matches archived objects.
+          matchesPattern: A regular expression that satisfies the RE2 syntax.
+            This condition is satisfied when the name of the object matches
+            the RE2 pattern. Note: This feature is currently in the "Early
+            Access" launch stage and is only available to a whitelisted set of
+            users; that means that this feature may be changed in backward-
+            incompatible ways and that it is not guaranteed to be released.
+          matchesStorageClass: Objects having any of the storage classes
+            specified by this condition will be matched. Values include
+            MULTI_REGIONAL, REGIONAL, NEARLINE, COLDLINE, STANDARD, and
+            DURABLE_REDUCED_AVAILABILITY.
+          numNewerVersions: Relevant only for versioned objects. If the value
+            is N, this condition is satisfied when there are at least N
+            versions (including the live version) newer than this version of
+            the object.
+        """
+
+        age = _messages.IntegerField(1, variant=_messages.Variant.INT32)
+        createdBefore = extra_types.DateField(2)
+        isLive = _messages.BooleanField(3)
+        matchesPattern = _messages.StringField(4)
+        matchesStorageClass = _messages.StringField(5, repeated=True)
+        numNewerVersions = _messages.IntegerField(
+            6, variant=_messages.Variant.INT32)
+
+      action = _messages.MessageField('ActionValue', 1)
+      condition = _messages.MessageField('ConditionValue', 2)
+
+    rule = _messages.MessageField('RuleValueListEntry', 1, repeated=True)
+
+  class LoggingValue(_messages.Message):
+    r"""The bucket's logging configuration, which defines the destination
+    bucket and optional name prefix for the current bucket's logs.
+
+    Fields:
+      logBucket: The destination bucket where the current bucket's logs should
+        be placed.
+      logObjectPrefix: A prefix for log object names.
+    """
+
+    logBucket = _messages.StringField(1)
+    logObjectPrefix = _messages.StringField(2)
+
+  class OwnerValue(_messages.Message):
+    r"""The owner of the bucket. This is always the project team's owner
+    group.
+
+    Fields:
+      entity: The entity, in the form project-owner-projectId.
+      entityId: The ID for the entity.
+    """
+
+    entity = _messages.StringField(1)
+    entityId = _messages.StringField(2)
+
+  class RetentionPolicyValue(_messages.Message):
+    r"""The bucket's retention policy. The retention policy enforces a minimum
+    retention time for all objects contained in the bucket, based on their
+    creation time. Any attempt to overwrite or delete objects younger than the
+    retention period will result in a PERMISSION_DENIED error. An unlocked
+    retention policy can be modified or removed from the bucket via a
+    storage.buckets.update operation. A locked retention policy cannot be
+    removed or shortened in duration for the lifetime of the bucket.
+    Attempting to remove or decrease period of a locked retention policy will
+    result in a PERMISSION_DENIED error.
+
+    Fields:
+      effectiveTime: Server-determined value that indicates the time from
+        which policy was enforced and effective. This value is in RFC 3339
+        format.
+      isLocked: Once locked, an object retention policy cannot be modified.
+      retentionPeriod: The duration in seconds that objects need to be
+        retained. Retention duration must be greater than zero and less than
+        100 years. Note that enforcement of retention periods less than a day
+        is not guaranteed. Such periods should only be used for testing
+        purposes.
+    """
+
+    effectiveTime = _message_types.DateTimeField(1)
+    isLocked = _messages.BooleanField(2)
+    retentionPeriod = _messages.IntegerField(3)
+
+  class VersioningValue(_messages.Message):
+    r"""The bucket's versioning configuration.
+
+    Fields:
+      enabled: While set to true, versioning is fully enabled for this bucket.
+    """
+
+    enabled = _messages.BooleanField(1)
+
+  class WebsiteValue(_messages.Message):
+    r"""The bucket's website configuration, controlling how the service
+    behaves when accessing bucket contents as a web site. See the Static
+    Website Examples for more information.
+
+    Fields:
+      mainPageSuffix: If the requested object path is missing, the service
+        will ensure the path has a trailing '/', append this suffix, and
+        attempt to retrieve the resulting object. This allows the creation of
+        index.html objects to represent directory pages.
+      notFoundPage: If the requested object path is missing, and any
+        mainPageSuffix object is missing, if applicable, the service will
+        return the named object from this bucket as the content for a 404 Not
+        Found result.
+    """
+
+    mainPageSuffix = _messages.StringField(1)
+    notFoundPage = _messages.StringField(2)
+
+  acl = _messages.MessageField('BucketAccessControl', 1, repeated=True)
+  billing = _messages.MessageField('BillingValue', 2)
+  cors = _messages.MessageField('CorsValueListEntry', 3, repeated=True)
+  defaultEventBasedHold = _messages.BooleanField(4)
+  defaultObjectAcl = _messages.MessageField(
+      'ObjectAccessControl', 5, repeated=True)
+  encryption = _messages.MessageField('EncryptionValue', 6)
+  etag = _messages.StringField(7)
+  id = _messages.StringField(8)
+  kind = _messages.StringField(9, default=u'storage#bucket')
+  labels = _messages.MessageField('LabelsValue', 10)
+  lifecycle = _messages.MessageField('LifecycleValue', 11)
+  location = _messages.StringField(12)
+  logging = _messages.MessageField('LoggingValue', 13)
+  metageneration = _messages.IntegerField(14)
+  name = _messages.StringField(15)
+  owner = _messages.MessageField('OwnerValue', 16)
+  projectNumber = _messages.IntegerField(17, variant=_messages.Variant.UINT64)
+  retentionPolicy = _messages.MessageField('RetentionPolicyValue', 18)
+  selfLink = _messages.StringField(19)
+  storageClass = _messages.StringField(20)
+  timeCreated = _message_types.DateTimeField(21)
+  updated = _message_types.DateTimeField(22)
+  versioning = _messages.MessageField('VersioningValue', 23)
+  website = _messages.MessageField('WebsiteValue', 24)
+
+
+class BucketAccessControl(_messages.Message):
+  r"""An access-control entry.
+
+  Messages:
+    ProjectTeamValue: The project team associated with the entity, if any.
+
+  Fields:
+    bucket: The name of the bucket.
+    domain: The domain associated with the entity, if any.
+    email: The email address associated with the entity, if any.
+    entity: The entity holding the permission, in one of the following forms:
+      - user-userId  - user-email  - group-groupId  - group-email  - domain-
+      domain  - project-team-projectId  - allUsers  - allAuthenticatedUsers
+      Examples:  - The user liz@example.com would be user-liz@example.com.  -
+      The group example@googlegroups.com would be group-
+      example@googlegroups.com.  - To refer to all members of the Google Apps
+      for Business domain example.com, the entity would be domain-example.com.
+    entityId: The ID for the entity, if any.
+    etag: HTTP 1.1 Entity tag for the access-control entry.
+    id: The ID of the access-control entry.
+    kind: The kind of item this is. For bucket access control entries, this is
+      always storage#bucketAccessControl.
+    projectTeam: The project team associated with the entity, if any.
+    role: The access permission for the entity.
+    selfLink: The link to this access-control entry.
+  """
+  class ProjectTeamValue(_messages.Message):
+    r"""The project team associated with the entity, if any.
+
+    Fields:
+      projectNumber: The project number.
+      team: The team.
+    """
+
+    projectNumber = _messages.StringField(1)
+    team = _messages.StringField(2)
+
+  bucket = _messages.StringField(1)
+  domain = _messages.StringField(2)
+  email = _messages.StringField(3)
+  entity = _messages.StringField(4)
+  entityId = _messages.StringField(5)
+  etag = _messages.StringField(6)
+  id = _messages.StringField(7)
+  kind = _messages.StringField(8, default=u'storage#bucketAccessControl')
+  projectTeam = _messages.MessageField('ProjectTeamValue', 9)
+  role = _messages.StringField(10)
+  selfLink = _messages.StringField(11)
+
+
+class BucketAccessControls(_messages.Message):
+  r"""An access-control list.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of bucket access control
+      entries, this is always storage#bucketAccessControls.
+  """
+
+  items = _messages.MessageField('BucketAccessControl', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#bucketAccessControls')
+
+
+class Buckets(_messages.Message):
+  r"""A list of buckets.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of buckets, this is always
+      storage#buckets.
+    nextPageToken: The continuation token, used to page through large result
+      sets. Provide this value in a subsequent request to return the next page
+      of results.
+  """
+
+  items = _messages.MessageField('Bucket', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#buckets')
+  nextPageToken = _messages.StringField(3)
+
+
+class Channel(_messages.Message):
+  r"""An notification channel used to watch for resource changes.
+
+  Messages:
+    ParamsValue: Additional parameters controlling delivery channel behavior.
+      Optional.
+
+  Fields:
+    address: The address where notifications are delivered for this channel.
+    expiration: Date and time of notification channel expiration, expressed as
+      a Unix timestamp, in milliseconds. Optional.
+    id: A UUID or similar unique string that identifies this channel.
+    kind: Identifies this as a notification channel used to watch for changes
+      to a resource. Value: the fixed string "api#channel".
+    params: Additional parameters controlling delivery channel behavior.
+      Optional.
+    payload: A Boolean value to indicate whether payload is wanted. Optional.
+    resourceId: An opaque ID that identifies the resource being watched on
+      this channel. Stable across different API versions.
+    resourceUri: A version-specific identifier for the watched resource.
+    token: An arbitrary string delivered to the target address with each
+      notification delivered over this channel. Optional.
+    type: The type of delivery mechanism used for this channel.
+  """
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class ParamsValue(_messages.Message):
+    r"""Additional parameters controlling delivery channel behavior. Optional.
+
+    Messages:
+      AdditionalProperty: An additional property for a ParamsValue object.
+
+    Fields:
+      additionalProperties: Declares a new parameter by name.
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a ParamsValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  address = _messages.StringField(1)
+  expiration = _messages.IntegerField(2)
+  id = _messages.StringField(3)
+  kind = _messages.StringField(4, default=u'api#channel')
+  params = _messages.MessageField('ParamsValue', 5)
+  payload = _messages.BooleanField(6)
+  resourceId = _messages.StringField(7)
+  resourceUri = _messages.StringField(8)
+  token = _messages.StringField(9)
+  type = _messages.StringField(10)
+
+
+class ComposeRequest(_messages.Message):
+  r"""A Compose request.
+
+  Messages:
+    SourceObjectsValueListEntry: A SourceObjectsValueListEntry object.
+
+  Fields:
+    destination: Properties of the resulting object.
+    kind: The kind of item this is.
+    sourceObjects: The list of source objects that will be concatenated into a
+      single object.
+  """
+  class SourceObjectsValueListEntry(_messages.Message):
+    r"""A SourceObjectsValueListEntry object.
+
+    Messages:
+      ObjectPreconditionsValue: Conditions that must be met for this operation
+        to execute.
+
+    Fields:
+      generation: The generation of this object to use as the source.
+      name: The source object's name. All source objects must reside in the
+        same bucket.
+      objectPreconditions: Conditions that must be met for this operation to
+        execute.
+    """
+    class ObjectPreconditionsValue(_messages.Message):
+      r"""Conditions that must be met for this operation to execute.
+
+      Fields:
+        ifGenerationMatch: Only perform the composition if the generation of
+          the source object that would be used matches this value. If this
+          value and a generation are both specified, they must be the same
+          value or the call will fail.
+      """
+
+      ifGenerationMatch = _messages.IntegerField(1)
+
+    generation = _messages.IntegerField(1)
+    name = _messages.StringField(2)
+    objectPreconditions = _messages.MessageField('ObjectPreconditionsValue', 3)
+
+  destination = _messages.MessageField('Object', 1)
+  kind = _messages.StringField(2, default=u'storage#composeRequest')
+  sourceObjects = _messages.MessageField(
+      'SourceObjectsValueListEntry', 3, repeated=True)
+
+
+class Notification(_messages.Message):
+  r"""A subscription to receive Google PubSub notifications.
+
+  Messages:
+    CustomAttributesValue: An optional list of additional attributes to attach
+      to each Cloud PubSub message published for this notification
+      subscription.
+
+  Fields:
+    custom_attributes: An optional list of additional attributes to attach to
+      each Cloud PubSub message published for this notification subscription.
+    etag: HTTP 1.1 Entity tag for this subscription notification.
+    event_types: If present, only send notifications about listed event types.
+      If empty, sent notifications for all event types.
+    id: The ID of the notification.
+    kind: The kind of item this is. For notifications, this is always
+      storage#notification.
+    object_name_prefix: If present, only apply this notification configuration
+      to object names that begin with this prefix.
+    payload_format: The desired content of the Payload.
+    selfLink: The canonical URL of this notification.
+    topic: The Cloud PubSub topic to which this subscription publishes.
+      Formatted as: '//pubsub.googleapis.com/projects/{project-
+      identifier}/topics/{my-topic}'
+  """
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class CustomAttributesValue(_messages.Message):
+    r"""An optional list of additional attributes to attach to each Cloud
+    PubSub message published for this notification subscription.
+
+    Messages:
+      AdditionalProperty: An additional property for a CustomAttributesValue
+        object.
+
+    Fields:
+      additionalProperties: Additional properties of type
+        CustomAttributesValue
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a CustomAttributesValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  custom_attributes = _messages.MessageField('CustomAttributesValue', 1)
+  etag = _messages.StringField(2)
+  event_types = _messages.StringField(3, repeated=True)
+  id = _messages.StringField(4)
+  kind = _messages.StringField(5, default=u'storage#notification')
+  object_name_prefix = _messages.StringField(6)
+  payload_format = _messages.StringField(7, default=u'JSON_API_V1')
+  selfLink = _messages.StringField(8)
+  topic = _messages.StringField(9)
+
+
+class Notifications(_messages.Message):
+  r"""A list of notification subscriptions.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of notifications, this is always
+      storage#notifications.
+  """
+
+  items = _messages.MessageField('Notification', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#notifications')
+
+
+class Object(_messages.Message):
+  r"""An object.
+
+  Messages:
+    CustomerEncryptionValue: Metadata of customer-supplied encryption key, if
+      the object is encrypted by such a key.
+    MetadataValue: User-provided metadata, in key/value pairs.
+    OwnerValue: The owner of the object. This will always be the uploader of
+      the object.
+
+  Fields:
+    acl: Access controls on the object.
+    bucket: The name of the bucket containing this object.
+    cacheControl: Cache-Control directive for the object data. If omitted, and
+      the object is accessible to all anonymous users, the default will be
+      public, max-age=3600.
+    componentCount: Number of underlying components that make up this object.
+      Components are accumulated by compose operations.
+    contentDisposition: Content-Disposition of the object data.
+    contentEncoding: Content-Encoding of the object data.
+    contentLanguage: Content-Language of the object data.
+    contentType: Content-Type of the object data. If an object is stored
+      without a Content-Type, it is served as application/octet-stream.
+    crc32c: CRC32c checksum, as described in RFC 4960, Appendix B; encoded
+      using base64 in big-endian byte order. For more information about using
+      the CRC32c checksum, see Hashes and ETags: Best Practices.
+    customerEncryption: Metadata of customer-supplied encryption key, if the
+      object is encrypted by such a key.
+    etag: HTTP 1.1 Entity tag for the object.
+    eventBasedHold: Whether an object is under event-based hold. Event-based
+      hold is a way to retain objects until an event occurs, which is
+      signified by the hold's release (i.e. this value is set to false). After
+      being released (set to false), such objects will be subject to bucket-
+      level retention (if any). One sample use case of this flag is for banks
+      to hold loan documents for at least 3 years after loan is paid in full.
+      Here, bucket-level retention is 3 years and the event is the loan being
+      paid in full. In this example, these objects will be held intact for any
+      number of years until the event has occurred (event-based hold on the
+      object is released) and then 3 more years after that. That means
+      retention duration of the objects begins from the moment event-based
+      hold transitioned from true to false.
+    generation: The content generation of this object. Used for object
+      versioning.
+    id: The ID of the object, including the bucket name, object name, and
+      generation number.
+    kind: The kind of item this is. For objects, this is always
+      storage#object.
+    kmsKeyName: Cloud KMS Key used to encrypt this object, if the object is
+      encrypted by such a key.
+    md5Hash: MD5 hash of the data; encoded using base64. For more information
+      about using the MD5 hash, see Hashes and ETags: Best Practices.
+    mediaLink: Media download link.
+    metadata: User-provided metadata, in key/value pairs.
+    metageneration: The version of the metadata for this object at this
+      generation. Used for preconditions and for detecting changes in
+      metadata. A metageneration number is only meaningful in the context of a
+      particular generation of a particular object.
+    name: The name of the object. Required if not specified by URL parameter.
+    owner: The owner of the object. This will always be the uploader of the
+      object.
+    retentionExpirationTime: A server-determined value that specifies the
+      earliest time that the object's retention period expires. This value is
+      in RFC 3339 format. Note 1: This field is not provided for objects with
+      an active event-based hold, since retention expiration is unknown until
+      the hold is removed. Note 2: This value can be provided even when
+      temporary hold is set (so that the user can reason about policy without
+      having to first unset the temporary hold).
+    selfLink: The link to this object.
+    size: Content-Length of the data in bytes.
+    storageClass: Storage class of the object.
+    temporaryHold: Whether an object is under temporary hold. While this flag
+      is set to true, the object is protected against deletion and overwrites.
+      A common use case of this flag is regulatory investigations where
+      objects need to be retained while the investigation is ongoing. Note
+      that unlike event-based hold, temporary hold does not impact retention
+      expiration time of an object.
+    timeCreated: The creation time of the object in RFC 3339 format.
+    timeDeleted: The deletion time of the object in RFC 3339 format. Will be
+      returned if and only if this version of the object has been deleted.
+    timeStorageClassUpdated: The time at which the object's storage class was
+      last changed. When the object is initially created, it will be set to
+      timeCreated.
+    updated: The modification time of the object metadata in RFC 3339 format.
+  """
+  class CustomerEncryptionValue(_messages.Message):
+    r"""Metadata of customer-supplied encryption key, if the object is
+    encrypted by such a key.
+
+    Fields:
+      encryptionAlgorithm: The encryption algorithm.
+      keySha256: SHA256 hash value of the encryption key.
+    """
+
+    encryptionAlgorithm = _messages.StringField(1)
+    keySha256 = _messages.StringField(2)
+
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class MetadataValue(_messages.Message):
+    r"""User-provided metadata, in key/value pairs.
+
+    Messages:
+      AdditionalProperty: An additional property for a MetadataValue object.
+
+    Fields:
+      additionalProperties: An individual metadata entry.
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a MetadataValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  class OwnerValue(_messages.Message):
+    r"""The owner of the object. This will always be the uploader of the
+    object.
+
+    Fields:
+      entity: The entity, in the form user-userId.
+      entityId: The ID for the entity.
+    """
+
+    entity = _messages.StringField(1)
+    entityId = _messages.StringField(2)
+
+  acl = _messages.MessageField('ObjectAccessControl', 1, repeated=True)
+  bucket = _messages.StringField(2)
+  cacheControl = _messages.StringField(3)
+  componentCount = _messages.IntegerField(4, variant=_messages.Variant.INT32)
+  contentDisposition = _messages.StringField(5)
+  contentEncoding = _messages.StringField(6)
+  contentLanguage = _messages.StringField(7)
+  contentType = _messages.StringField(8)
+  crc32c = _messages.StringField(9)
+  customerEncryption = _messages.MessageField('CustomerEncryptionValue', 10)
+  etag = _messages.StringField(11)
+  eventBasedHold = _messages.BooleanField(12)
+  generation = _messages.IntegerField(13)
+  id = _messages.StringField(14)
+  kind = _messages.StringField(15, default=u'storage#object')
+  kmsKeyName = _messages.StringField(16)
+  md5Hash = _messages.StringField(17)
+  mediaLink = _messages.StringField(18)
+  metadata = _messages.MessageField('MetadataValue', 19)
+  metageneration = _messages.IntegerField(20)
+  name = _messages.StringField(21)
+  owner = _messages.MessageField('OwnerValue', 22)
+  retentionExpirationTime = _message_types.DateTimeField(23)
+  selfLink = _messages.StringField(24)
+  size = _messages.IntegerField(25, variant=_messages.Variant.UINT64)
+  storageClass = _messages.StringField(26)
+  temporaryHold = _messages.BooleanField(27)
+  timeCreated = _message_types.DateTimeField(28)
+  timeDeleted = _message_types.DateTimeField(29)
+  timeStorageClassUpdated = _message_types.DateTimeField(30)
+  updated = _message_types.DateTimeField(31)
+
+
+class ObjectAccessControl(_messages.Message):
+  r"""An access-control entry.
+
+  Messages:
+    ProjectTeamValue: The project team associated with the entity, if any.
+
+  Fields:
+    bucket: The name of the bucket.
+    domain: The domain associated with the entity, if any.
+    email: The email address associated with the entity, if any.
+    entity: The entity holding the permission, in one of the following forms:
+      - user-userId  - user-email  - group-groupId  - group-email  - domain-
+      domain  - project-team-projectId  - allUsers  - allAuthenticatedUsers
+      Examples:  - The user liz@example.com would be user-liz@example.com.  -
+      The group example@googlegroups.com would be group-
+      example@googlegroups.com.  - To refer to all members of the Google Apps
+      for Business domain example.com, the entity would be domain-example.com.
+    entityId: The ID for the entity, if any.
+    etag: HTTP 1.1 Entity tag for the access-control entry.
+    generation: The content generation of the object, if applied to an object.
+    id: The ID of the access-control entry.
+    kind: The kind of item this is. For object access control entries, this is
+      always storage#objectAccessControl.
+    object: The name of the object, if applied to an object.
+    projectTeam: The project team associated with the entity, if any.
+    role: The access permission for the entity.
+    selfLink: The link to this access-control entry.
+  """
+  class ProjectTeamValue(_messages.Message):
+    r"""The project team associated with the entity, if any.
+
+    Fields:
+      projectNumber: The project number.
+      team: The team.
+    """
+
+    projectNumber = _messages.StringField(1)
+    team = _messages.StringField(2)
+
+  bucket = _messages.StringField(1)
+  domain = _messages.StringField(2)
+  email = _messages.StringField(3)
+  entity = _messages.StringField(4)
+  entityId = _messages.StringField(5)
+  etag = _messages.StringField(6)
+  generation = _messages.IntegerField(7)
+  id = _messages.StringField(8)
+  kind = _messages.StringField(9, default=u'storage#objectAccessControl')
+  object = _messages.StringField(10)
+  projectTeam = _messages.MessageField('ProjectTeamValue', 11)
+  role = _messages.StringField(12)
+  selfLink = _messages.StringField(13)
+
+
+class ObjectAccessControls(_messages.Message):
+  r"""An access-control list.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of object access control
+      entries, this is always storage#objectAccessControls.
+  """
+
+  items = _messages.MessageField('ObjectAccessControl', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#objectAccessControls')
+
+
+class Objects(_messages.Message):
+  r"""A list of objects.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of objects, this is always
+      storage#objects.
+    nextPageToken: The continuation token, used to page through large result
+      sets. Provide this value in a subsequent request to return the next page
+      of results.
+    prefixes: The list of prefixes of objects matching-but-not-listed up to
+      and including the requested delimiter.
+  """
+
+  items = _messages.MessageField('Object', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#objects')
+  nextPageToken = _messages.StringField(3)
+  prefixes = _messages.StringField(4, repeated=True)
+
+
+class Policy(_messages.Message):
+  r"""A bucket/object IAM policy.
+
+  Messages:
+    BindingsValueListEntry: A BindingsValueListEntry object.
+
+  Fields:
+    bindings: An association between a role, which comes with a set of
+      permissions, and members who may assume that role.
+    etag: HTTP 1.1  Entity tag for the policy.
+    kind: The kind of item this is. For policies, this is always
+      storage#policy. This field is ignored on input.
+    resourceId: The ID of the resource to which this policy belongs. Will be
+      of the form projects/_/buckets/bucket for buckets, and
+      projects/_/buckets/bucket/objects/object for objects. A specific
+      generation may be specified by appending #generationNumber to the end of
+      the object name, e.g. projects/_/buckets/my-bucket/objects/data.txt#17.
+      The current generation can be denoted with #0. This field is ignored on
+      input.
+  """
+  class BindingsValueListEntry(_messages.Message):
+    r"""A BindingsValueListEntry object.
+
+    Fields:
+      condition: A extra_types.JsonValue attribute.
+      members: A collection of identifiers for members who may assume the
+        provided role. Recognized identifiers are as follows:   - allUsers - A
+        special identifier that represents anyone on the internet; with or
+        without a Google account.   - allAuthenticatedUsers - A special
+        identifier that represents anyone who is authenticated with a Google
+        account or a service account.   - user:emailid - An email address that
+        represents a specific account. For example, user:alice@gmail.com or
+        user:joe@example.com.   - serviceAccount:emailid - An email address
+        that represents a service account. For example,  serviceAccount:my-
+        other-app@appspot.gserviceaccount.com .   - group:emailid - An email
+        address that represents a Google group. For example,
+        group:admins@example.com.   - domain:domain - A Google Apps domain
+        name that represents all the users of that domain. For example,
+        domain:google.com or domain:example.com.   - projectOwner:projectid -
+        Owners of the given project. For example, projectOwner:my-example-
+        project   - projectEditor:projectid - Editors of the given project.
+        For example, projectEditor:my-example-project   -
+        projectViewer:projectid - Viewers of the given project. For example,
+        projectViewer:my-example-project
+      role: The role to which members belong. Two types of roles are
+        supported: new IAM roles, which grant permissions that do not map
+        directly to those provided by ACLs, and legacy IAM roles, which do map
+        directly to ACL permissions. All roles are of the format
+        roles/storage.specificRole. The new IAM roles are:   -
+        roles/storage.admin - Full control of Google Cloud Storage resources.
+        - roles/storage.objectViewer - Read-Only access to Google Cloud
+        Storage objects.   - roles/storage.objectCreator - Access to create
+        objects in Google Cloud Storage.   - roles/storage.objectAdmin - Full
+        control of Google Cloud Storage objects.   The legacy IAM roles are:
+        - roles/storage.legacyObjectReader - Read-only access to objects
+        without listing. Equivalent to an ACL entry on an object with the
+        READER role.   - roles/storage.legacyObjectOwner - Read/write access
+        to existing objects without listing. Equivalent to an ACL entry on an
+        object with the OWNER role.   - roles/storage.legacyBucketReader -
+        Read access to buckets with object listing. Equivalent to an ACL entry
+        on a bucket with the READER role.   - roles/storage.legacyBucketWriter
+        - Read access to buckets with object listing/creation/deletion.
+        Equivalent to an ACL entry on a bucket with the WRITER role.   -
+        roles/storage.legacyBucketOwner - Read and write access to existing
+        buckets with object listing/creation/deletion. Equivalent to an ACL
+        entry on a bucket with the OWNER role.
+    """
+
+    condition = _messages.MessageField('extra_types.JsonValue', 1)
+    members = _messages.StringField(2, repeated=True)
+    role = _messages.StringField(3)
+
+  bindings = _messages.MessageField('BindingsValueListEntry', 1, repeated=True)
+  etag = _messages.BytesField(2)
+  kind = _messages.StringField(3, default=u'storage#policy')
+  resourceId = _messages.StringField(4)
+
+
+class RewriteResponse(_messages.Message):
+  r"""A rewrite response.
+
+  Fields:
+    done: true if the copy is finished; otherwise, false if the copy is in
+      progress. This property is always present in the response.
+    kind: The kind of item this is.
+    objectSize: The total size of the object being copied in bytes. This
+      property is always present in the response.
+    resource: A resource containing the metadata for the copied-to object.
+      This property is present in the response only when copying completes.
+    rewriteToken: A token to use in subsequent requests to continue copying
+      data. This token is present in the response only when there is more data
+      to copy.
+    totalBytesRewritten: The total bytes written so far, which can be used to
+      provide a waiting user with a progress indicator. This property is
+      always present in the response.
+  """
+
+  done = _messages.BooleanField(1)
+  kind = _messages.StringField(2, default=u'storage#rewriteResponse')
+  objectSize = _messages.IntegerField(3)
+  resource = _messages.MessageField('Object', 4)
+  rewriteToken = _messages.StringField(5)
+  totalBytesRewritten = _messages.IntegerField(6)
+
+
+class ServiceAccount(_messages.Message):
+  r"""A subscription to receive Google PubSub notifications.
+
+  Fields:
+    email_address: The ID of the notification.
+    kind: The kind of item this is. For notifications, this is always
+      storage#notification.
+  """
+
+  email_address = _messages.StringField(1)
+  kind = _messages.StringField(2, default=u'storage#serviceAccount')
+
+
+class StandardQueryParameters(_messages.Message):
+  r"""Query parameters accepted by all methods.
+
+  Enums:
+    AltValueValuesEnum: Data format for the response.
+
+  Fields:
+    alt: Data format for the response.
+    fields: Selector specifying which fields to include in a partial response.
+    key: API key. Your API key identifies your project and provides you with
+      API access, quota, and reports. Required unless you provide an OAuth 2.0
+      token.
+    oauth_token: OAuth 2.0 token for the current user.
+    prettyPrint: Returns response with indentations and line breaks.
+    quotaUser: An opaque string that represents a user for quota purposes.
+      Must not exceed 40 characters.
+    trace: A tracing token of the form "token:<tokenid>" to include in api
+      requests.
+    userIp: Deprecated. Please use quotaUser instead.
+  """
+  class AltValueValuesEnum(_messages.Enum):
+    r"""Data format for the response.
+
+    Values:
+      json: Responses with Content-Type of application/json
+    """
+    json = 0
+
+  alt = _messages.EnumField('AltValueValuesEnum', 1, default=u'json')
+  fields = _messages.StringField(2)
+  key = _messages.StringField(3)
+  oauth_token = _messages.StringField(4)
+  prettyPrint = _messages.BooleanField(5, default=True)
+  quotaUser = _messages.StringField(6)
+  trace = _messages.StringField(7)
+  userIp = _messages.StringField(8)
+
+
+class StorageBucketAccessControlsDeleteRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketAccessControlsDeleteResponse(_messages.Message):
+  r"""An empty StorageBucketAccessControlsDelete response."""
+
+
+class StorageBucketAccessControlsGetRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsGetRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketAccessControlsInsertRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsInsertRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketAccessControl: A BucketAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  bucketAccessControl = _messages.MessageField('BucketAccessControl', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketAccessControlsListRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsListRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class StorageBucketAccessControlsPatchRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsPatchRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketAccessControl: A BucketAccessControl resource to be passed as the
+      request body.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  bucketAccessControl = _messages.MessageField('BucketAccessControl', 2)
+  entity = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageBucketAccessControlsUpdateRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsUpdateRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketAccessControl: A BucketAccessControl resource to be passed as the
+      request body.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  bucketAccessControl = _messages.MessageField('BucketAccessControl', 2)
+  entity = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageBucketsDeleteRequest(_messages.Message):
+  r"""A StorageBucketsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: If set, only deletes the bucket if its
+      metageneration matches this value.
+    ifMetagenerationNotMatch: If set, only deletes the bucket if its
+      metageneration does not match this value.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2)
+  ifMetagenerationNotMatch = _messages.IntegerField(3)
+  userProject = _messages.StringField(4)
+
+
+class StorageBucketsDeleteResponse(_messages.Message):
+  r"""An empty StorageBucketsDelete response."""
+
+
+class StorageBucketsGetIamPolicyRequest(_messages.Message):
+  r"""A StorageBucketsGetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class StorageBucketsGetRequest(_messages.Message):
+  r"""A StorageBucketsGetRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: Makes the return of the bucket metadata conditional
+      on whether the bucket's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the return of the bucket metadata
+      conditional on whether the bucket's current metageneration does not
+      match the given value.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2)
+  ifMetagenerationNotMatch = _messages.IntegerField(3)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 4)
+  userProject = _messages.StringField(5)
+
+
+class StorageBucketsInsertRequest(_messages.Message):
+  r"""A StorageBucketsInsertRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this bucket.
+    PredefinedDefaultObjectAclValueValuesEnum: Apply a predefined set of
+      default object access controls to this bucket.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the bucket resource specifies acl or defaultObjectAcl properties,
+      when it defaults to full.
+
+  Fields:
+    bucket: A Bucket resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this bucket.
+    predefinedDefaultObjectAcl: Apply a predefined set of default object
+      access controls to this bucket.
+    project: A valid API project identifier.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      bucket resource specifies acl or defaultObjectAcl properties, when it
+      defaults to full.
+    userProject: The project to be billed for this request.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this bucket.
+
+    Values:
+      authenticatedRead: Project team owners get OWNER access, and
+        allAuthenticatedUsers get READER access.
+      private: Project team owners get OWNER access.
+      projectPrivate: Project team members get access according to their
+        roles.
+      publicRead: Project team owners get OWNER access, and allUsers get
+        READER access.
+      publicReadWrite: Project team owners get OWNER access, and allUsers get
+        WRITER access.
+    """
+    authenticatedRead = 0
+    private = 1
+    projectPrivate = 2
+    publicRead = 3
+    publicReadWrite = 4
+
+  class PredefinedDefaultObjectAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of default object access controls to this
+    bucket.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the bucket
+    resource specifies acl or defaultObjectAcl properties, when it defaults to
+    full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.MessageField('Bucket', 1)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 2)
+  predefinedDefaultObjectAcl = _messages.EnumField(
+      'PredefinedDefaultObjectAclValueValuesEnum', 3)
+  project = _messages.StringField(4, required=True)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageBucketsListRequest(_messages.Message):
+  r"""A StorageBucketsListRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    maxResults: Maximum number of buckets to return in a single response. The
+      service will use this parameter or 1,000 items, whichever is smaller.
+    pageToken: A previously-returned page token representing part of the
+      larger set of results to view.
+    prefix: Filter results to buckets whose names begin with this prefix.
+    project: A valid API project identifier.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  maxResults = _messages.IntegerField(
+      1, variant=_messages.Variant.UINT32, default=1000)
+  pageToken = _messages.StringField(2)
+  prefix = _messages.StringField(3)
+  project = _messages.StringField(4, required=True)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageBucketsLockRetentionPolicyRequest(_messages.Message):
+  r"""A StorageBucketsLockRetentionPolicyRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: Makes the operation conditional on whether bucket's
+      current metageneration matches the given value.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketsPatchRequest(_messages.Message):
+  r"""A StorageBucketsPatchRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this bucket.
+    PredefinedDefaultObjectAclValueValuesEnum: Apply a predefined set of
+      default object access controls to this bucket.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketResource: A Bucket resource to be passed as the request body.
+    ifMetagenerationMatch: Makes the return of the bucket metadata conditional
+      on whether the bucket's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the return of the bucket metadata
+      conditional on whether the bucket's current metageneration does not
+      match the given value.
+    predefinedAcl: Apply a predefined set of access controls to this bucket.
+    predefinedDefaultObjectAcl: Apply a predefined set of default object
+      access controls to this bucket.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this bucket.
+
+    Values:
+      authenticatedRead: Project team owners get OWNER access, and
+        allAuthenticatedUsers get READER access.
+      private: Project team owners get OWNER access.
+      projectPrivate: Project team members get access according to their
+        roles.
+      publicRead: Project team owners get OWNER access, and allUsers get
+        READER access.
+      publicReadWrite: Project team owners get OWNER access, and allUsers get
+        WRITER access.
+    """
+    authenticatedRead = 0
+    private = 1
+    projectPrivate = 2
+    publicRead = 3
+    publicReadWrite = 4
+
+  class PredefinedDefaultObjectAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of default object access controls to this
+    bucket.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  bucketResource = _messages.MessageField('Bucket', 2)
+  ifMetagenerationMatch = _messages.IntegerField(3)
+  ifMetagenerationNotMatch = _messages.IntegerField(4)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 5)
+  predefinedDefaultObjectAcl = _messages.EnumField(
+      'PredefinedDefaultObjectAclValueValuesEnum', 6)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 7)
+  userProject = _messages.StringField(8)
+
+
+class StorageBucketsSetIamPolicyRequest(_messages.Message):
+  r"""A StorageBucketsSetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    policy: A Policy resource to be passed as the request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  policy = _messages.MessageField('Policy', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketsTestIamPermissionsRequest(_messages.Message):
+  r"""A StorageBucketsTestIamPermissionsRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    permissions: Permissions to test.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  permissions = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketsUpdateRequest(_messages.Message):
+  r"""A StorageBucketsUpdateRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this bucket.
+    PredefinedDefaultObjectAclValueValuesEnum: Apply a predefined set of
+      default object access controls to this bucket.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketResource: A Bucket resource to be passed as the request body.
+    ifMetagenerationMatch: Makes the return of the bucket metadata conditional
+      on whether the bucket's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the return of the bucket metadata
+      conditional on whether the bucket's current metageneration does not
+      match the given value.
+    predefinedAcl: Apply a predefined set of access controls to this bucket.
+    predefinedDefaultObjectAcl: Apply a predefined set of default object
+      access controls to this bucket.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this bucket.
+
+    Values:
+      authenticatedRead: Project team owners get OWNER access, and
+        allAuthenticatedUsers get READER access.
+      private: Project team owners get OWNER access.
+      projectPrivate: Project team members get access according to their
+        roles.
+      publicRead: Project team owners get OWNER access, and allUsers get
+        READER access.
+      publicReadWrite: Project team owners get OWNER access, and allUsers get
+        WRITER access.
+    """
+    authenticatedRead = 0
+    private = 1
+    projectPrivate = 2
+    publicRead = 3
+    publicReadWrite = 4
+
+  class PredefinedDefaultObjectAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of default object access controls to this
+    bucket.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  bucketResource = _messages.MessageField('Bucket', 2)
+  ifMetagenerationMatch = _messages.IntegerField(3)
+  ifMetagenerationNotMatch = _messages.IntegerField(4)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 5)
+  predefinedDefaultObjectAcl = _messages.EnumField(
+      'PredefinedDefaultObjectAclValueValuesEnum', 6)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 7)
+  userProject = _messages.StringField(8)
+
+
+class StorageChannelsStopResponse(_messages.Message):
+  r"""An empty StorageChannelsStop response."""
+
+
+class StorageDefaultObjectAccessControlsDeleteRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageDefaultObjectAccessControlsDeleteResponse(_messages.Message):
+  r"""An empty StorageDefaultObjectAccessControlsDelete response."""
+
+
+class StorageDefaultObjectAccessControlsGetRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsGetRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageDefaultObjectAccessControlsInsertRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsInsertRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageDefaultObjectAccessControlsListRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsListRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: If present, only return default ACL listing if the
+      bucket's current metageneration matches this value.
+    ifMetagenerationNotMatch: If present, only return default ACL listing if
+      the bucket's current metageneration does not match the given value.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2)
+  ifMetagenerationNotMatch = _messages.IntegerField(3)
+  userProject = _messages.StringField(4)
+
+
+class StorageDefaultObjectAccessControlsPatchRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsPatchRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 3)
+  userProject = _messages.StringField(4)
+
+
+class StorageDefaultObjectAccessControlsUpdateRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsUpdateRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 3)
+  userProject = _messages.StringField(4)
+
+
+class StorageNotificationsDeleteRequest(_messages.Message):
+  r"""A StorageNotificationsDeleteRequest object.
+
+  Fields:
+    bucket: The parent bucket of the notification.
+    notification: ID of the notification to delete.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  notification = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageNotificationsDeleteResponse(_messages.Message):
+  r"""An empty StorageNotificationsDelete response."""
+
+
+class StorageNotificationsGetRequest(_messages.Message):
+  r"""A StorageNotificationsGetRequest object.
+
+  Fields:
+    bucket: The parent bucket of the notification.
+    notification: Notification ID
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  notification = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageNotificationsInsertRequest(_messages.Message):
+  r"""A StorageNotificationsInsertRequest object.
+
+  Fields:
+    bucket: The parent bucket of the notification.
+    notification: A Notification resource to be passed as the request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  notification = _messages.MessageField('Notification', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageNotificationsListRequest(_messages.Message):
+  r"""A StorageNotificationsListRequest object.
+
+  Fields:
+    bucket: Name of a Google Cloud Storage bucket.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class StorageObjectAccessControlsDeleteRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectAccessControlsDeleteResponse(_messages.Message):
+  r"""An empty StorageObjectAccessControlsDelete response."""
+
+
+class StorageObjectAccessControlsGetRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsGetRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectAccessControlsInsertRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsInsertRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 4)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectAccessControlsListRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsListRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageObjectAccessControlsPatchRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsPatchRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageObjectAccessControlsUpdateRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsUpdateRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageObjectsComposeRequest(_messages.Message):
+  r"""A StorageObjectsComposeRequest object.
+
+  Enums:
+    DestinationPredefinedAclValueValuesEnum: Apply a predefined set of access
+      controls to the destination object.
+
+  Fields:
+    composeRequest: A ComposeRequest resource to be passed as the request
+      body.
+    destinationBucket: Name of the bucket containing the source objects. The
+      destination object is stored in this bucket.
+    destinationObject: Name of the new object. For information about how to
+      URL encode object names to be path safe, see Encoding URI Path Parts.
+    destinationPredefinedAcl: Apply a predefined set of access controls to the
+      destination object.
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    kmsKeyName: Resource name of the Cloud KMS key, of the form projects/my-
+      project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be
+      used to encrypt the object. Overrides the object metadata's kms_key_name
+      value, if any.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class DestinationPredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to the destination object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  composeRequest = _messages.MessageField('ComposeRequest', 1)
+  destinationBucket = _messages.StringField(2, required=True)
+  destinationObject = _messages.StringField(3, required=True)
+  destinationPredefinedAcl = _messages.EnumField(
+      'DestinationPredefinedAclValueValuesEnum', 4)
+  ifGenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationMatch = _messages.IntegerField(6)
+  kmsKeyName = _messages.StringField(7)
+  userProject = _messages.StringField(8)
+
+
+class StorageObjectsCopyRequest(_messages.Message):
+  r"""A StorageObjectsCopyRequest object.
+
+  Enums:
+    DestinationPredefinedAclValueValuesEnum: Apply a predefined set of access
+      controls to the destination object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the object resource specifies the acl property, when it defaults
+      to full.
+
+  Fields:
+    destinationBucket: Name of the bucket in which to store the new object.
+      Overrides the provided object metadata's bucket value, if any.For
+      information about how to URL encode object names to be path safe, see
+      Encoding URI Path Parts.
+    destinationObject: Name of the new object. Required when the object
+      metadata is not otherwise provided. Overrides the object metadata's name
+      value, if any.
+    destinationPredefinedAcl: Apply a predefined set of access controls to the
+      destination object.
+    ifGenerationMatch: Makes the operation conditional on whether the
+      destination object's current generation matches the given value. Setting
+      to 0 makes the operation succeed only if there are no live versions of
+      the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      destination object's current generation does not match the given value.
+      If no live object exists, the precondition fails. Setting to 0 makes the
+      operation succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      destination object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      destination object's current metageneration does not match the given
+      value.
+    ifSourceGenerationMatch: Makes the operation conditional on whether the
+      source object's current generation matches the given value.
+    ifSourceGenerationNotMatch: Makes the operation conditional on whether the
+      source object's current generation does not match the given value.
+    ifSourceMetagenerationMatch: Makes the operation conditional on whether
+      the source object's current metageneration matches the given value.
+    ifSourceMetagenerationNotMatch: Makes the operation conditional on whether
+      the source object's current metageneration does not match the given
+      value.
+    object: A Object resource to be passed as the request body.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      object resource specifies the acl property, when it defaults to full.
+    sourceBucket: Name of the bucket in which to find the source object.
+    sourceGeneration: If present, selects a specific revision of the source
+      object (as opposed to the latest version, the default).
+    sourceObject: Name of the source object. For information about how to URL
+      encode object names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class DestinationPredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to the destination object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the object
+    resource specifies the acl property, when it defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  destinationBucket = _messages.StringField(1, required=True)
+  destinationObject = _messages.StringField(2, required=True)
+  destinationPredefinedAcl = _messages.EnumField(
+      'DestinationPredefinedAclValueValuesEnum', 3)
+  ifGenerationMatch = _messages.IntegerField(4)
+  ifGenerationNotMatch = _messages.IntegerField(5)
+  ifMetagenerationMatch = _messages.IntegerField(6)
+  ifMetagenerationNotMatch = _messages.IntegerField(7)
+  ifSourceGenerationMatch = _messages.IntegerField(8)
+  ifSourceGenerationNotMatch = _messages.IntegerField(9)
+  ifSourceMetagenerationMatch = _messages.IntegerField(10)
+  ifSourceMetagenerationNotMatch = _messages.IntegerField(11)
+  object = _messages.MessageField('Object', 12)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 13)
+  sourceBucket = _messages.StringField(14, required=True)
+  sourceGeneration = _messages.IntegerField(15)
+  sourceObject = _messages.StringField(16, required=True)
+  userProject = _messages.StringField(17)
+
+
+class StorageObjectsDeleteRequest(_messages.Message):
+  r"""A StorageObjectsDeleteRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, permanently deletes a specific revision of this
+      object (as opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  userProject = _messages.StringField(8)
+
+
+class StorageObjectsDeleteResponse(_messages.Message):
+  r"""An empty StorageObjectsDelete response."""
+
+
+class StorageObjectsGetIamPolicyRequest(_messages.Message):
+  r"""A StorageObjectsGetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageObjectsGetRequest(_messages.Message):
+  r"""A StorageObjectsGetRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 8)
+  userProject = _messages.StringField(9)
+
+
+class StorageObjectsInsertRequest(_messages.Message):
+  r"""A StorageObjectsInsertRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the object resource specifies the acl property, when it defaults
+      to full.
+
+  Fields:
+    bucket: Name of the bucket in which to store the new object. Overrides the
+      provided object metadata's bucket value, if any.
+    contentEncoding: If set, sets the contentEncoding property of the final
+      object to this value. Setting this parameter is equivalent to setting
+      the contentEncoding metadata property. This can be useful when uploading
+      an object with uploadType=media to indicate the encoding of the content
+      being uploaded.
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    kmsKeyName: Resource name of the Cloud KMS key, of the form projects/my-
+      project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be
+      used to encrypt the object. Overrides the object metadata's kms_key_name
+      value, if any.
+    name: Name of the object. Required when the object metadata is not
+      otherwise provided. Overrides the object metadata's name value, if any.
+      For information about how to URL encode object names to be path safe,
+      see Encoding URI Path Parts.
+    object: A Object resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this object.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      object resource specifies the acl property, when it defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the object
+    resource specifies the acl property, when it defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  contentEncoding = _messages.StringField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  kmsKeyName = _messages.StringField(7)
+  name = _messages.StringField(8)
+  object = _messages.MessageField('Object', 9)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 10)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 11)
+  userProject = _messages.StringField(12)
+
+
+class StorageObjectsListRequest(_messages.Message):
+  r"""A StorageObjectsListRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of the bucket in which to look for objects.
+    delimiter: Returns results in a directory-like mode. items will contain
+      only objects whose names, aside from the prefix, do not contain
+      delimiter. Objects whose names, aside from the prefix, contain delimiter
+      will have their name, truncated after the delimiter, returned in
+      prefixes. Duplicate prefixes are omitted.
+    includeTrailingDelimiter: If true, objects that end in exactly one
+      instance of delimiter will have their metadata included in items in
+      addition to prefixes.
+    maxResults: Maximum number of items plus prefixes to return in a single
+      page of responses. As duplicate prefixes are omitted, fewer total
+      results may be returned than requested. The service will use this
+      parameter or 1,000 items, whichever is smaller.
+    pageToken: A previously-returned page token representing part of the
+      larger set of results to view.
+    prefix: Filter results to objects whose names begin with this prefix.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+    versions: If true, lists all versions of an object as distinct results.
+      The default is false. For more information, see Object Versioning.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  delimiter = _messages.StringField(2)
+  includeTrailingDelimiter = _messages.BooleanField(3)
+  maxResults = _messages.IntegerField(
+      4, variant=_messages.Variant.UINT32, default=1000)
+  pageToken = _messages.StringField(5)
+  prefix = _messages.StringField(6)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 7)
+  userProject = _messages.StringField(8)
+  versions = _messages.BooleanField(9)
+
+
+class StorageObjectsPatchRequest(_messages.Message):
+  r"""A StorageObjectsPatchRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectResource: A Object resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this object.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request, for Requester Pays
+      buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  objectResource = _messages.MessageField('Object', 8)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 9)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 10)
+  userProject = _messages.StringField(11)
+
+
+class StorageObjectsRewriteRequest(_messages.Message):
+  r"""A StorageObjectsRewriteRequest object.
+
+  Enums:
+    DestinationPredefinedAclValueValuesEnum: Apply a predefined set of access
+      controls to the destination object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the object resource specifies the acl property, when it defaults
+      to full.
+
+  Fields:
+    destinationBucket: Name of the bucket in which to store the new object.
+      Overrides the provided object metadata's bucket value, if any.
+    destinationKmsKeyName: Resource name of the Cloud KMS key, of the form
+      projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key,
+      that will be used to encrypt the object. Overrides the object metadata's
+      kms_key_name value, if any.
+    destinationObject: Name of the new object. Required when the object
+      metadata is not otherwise provided. Overrides the object metadata's name
+      value, if any. For information about how to URL encode object names to
+      be path safe, see Encoding URI Path Parts.
+    destinationPredefinedAcl: Apply a predefined set of access controls to the
+      destination object.
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      destination object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      destination object's current metageneration does not match the given
+      value.
+    ifSourceGenerationMatch: Makes the operation conditional on whether the
+      source object's current generation matches the given value.
+    ifSourceGenerationNotMatch: Makes the operation conditional on whether the
+      source object's current generation does not match the given value.
+    ifSourceMetagenerationMatch: Makes the operation conditional on whether
+      the source object's current metageneration matches the given value.
+    ifSourceMetagenerationNotMatch: Makes the operation conditional on whether
+      the source object's current metageneration does not match the given
+      value.
+    maxBytesRewrittenPerCall: The maximum number of bytes that will be
+      rewritten per rewrite request. Most callers shouldn't need to specify
+      this parameter - it is primarily in place to support testing. If
+      specified the value must be an integral multiple of 1 MiB (1048576).
+      Also, this only applies to requests where the source and destination
+      span locations and/or storage classes. Finally, this value must not
+      change across rewrite calls else you'll get an error that the
+      rewriteToken is invalid.
+    object: A Object resource to be passed as the request body.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      object resource specifies the acl property, when it defaults to full.
+    rewriteToken: Include this field (from the previous rewrite response) on
+      each rewrite request after the first one, until the rewrite response
+      'done' flag is true. Calls that provide a rewriteToken can omit all
+      other request fields, but if included those fields must match the values
+      provided in the first rewrite request.
+    sourceBucket: Name of the bucket in which to find the source object.
+    sourceGeneration: If present, selects a specific revision of the source
+      object (as opposed to the latest version, the default).
+    sourceObject: Name of the source object. For information about how to URL
+      encode object names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class DestinationPredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to the destination object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the object
+    resource specifies the acl property, when it defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  destinationBucket = _messages.StringField(1, required=True)
+  destinationKmsKeyName = _messages.StringField(2)
+  destinationObject = _messages.StringField(3, required=True)
+  destinationPredefinedAcl = _messages.EnumField(
+      'DestinationPredefinedAclValueValuesEnum', 4)
+  ifGenerationMatch = _messages.IntegerField(5)
+  ifGenerationNotMatch = _messages.IntegerField(6)
+  ifMetagenerationMatch = _messages.IntegerField(7)
+  ifMetagenerationNotMatch = _messages.IntegerField(8)
+  ifSourceGenerationMatch = _messages.IntegerField(9)
+  ifSourceGenerationNotMatch = _messages.IntegerField(10)
+  ifSourceMetagenerationMatch = _messages.IntegerField(11)
+  ifSourceMetagenerationNotMatch = _messages.IntegerField(12)
+  maxBytesRewrittenPerCall = _messages.IntegerField(13)
+  object = _messages.MessageField('Object', 14)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 15)
+  rewriteToken = _messages.StringField(16)
+  sourceBucket = _messages.StringField(17, required=True)
+  sourceGeneration = _messages.IntegerField(18)
+  sourceObject = _messages.StringField(19, required=True)
+  userProject = _messages.StringField(20)
+
+
+class StorageObjectsSetIamPolicyRequest(_messages.Message):
+  r"""A StorageObjectsSetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    policy: A Policy resource to be passed as the request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  policy = _messages.MessageField('Policy', 4)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectsTestIamPermissionsRequest(_messages.Message):
+  r"""A StorageObjectsTestIamPermissionsRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    permissions: Permissions to test.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  permissions = _messages.StringField(4, required=True)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectsUpdateRequest(_messages.Message):
+  r"""A StorageObjectsUpdateRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectResource: A Object resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this object.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  objectResource = _messages.MessageField('Object', 8)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 9)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 10)
+  userProject = _messages.StringField(11)
+
+
+class StorageObjectsWatchAllRequest(_messages.Message):
+  r"""A StorageObjectsWatchAllRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of the bucket in which to look for objects.
+    channel: A Channel resource to be passed as the request body.
+    delimiter: Returns results in a directory-like mode. items will contain
+      only objects whose names, aside from the prefix, do not contain
+      delimiter. Objects whose names, aside from the prefix, contain delimiter
+      will have their name, truncated after the delimiter, returned in
+      prefixes. Duplicate prefixes are omitted.
+    includeTrailingDelimiter: If true, objects that end in exactly one
+      instance of delimiter will have their metadata included in items in
+      addition to prefixes.
+    maxResults: Maximum number of items plus prefixes to return in a single
+      page of responses. As duplicate prefixes are omitted, fewer total
+      results may be returned than requested. The service will use this
+      parameter or 1,000 items, whichever is smaller.
+    pageToken: A previously-returned page token representing part of the
+      larger set of results to view.
+    prefix: Filter results to objects whose names begin with this prefix.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+    versions: If true, lists all versions of an object as distinct results.
+      The default is false. For more information, see Object Versioning.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  channel = _messages.MessageField('Channel', 2)
+  delimiter = _messages.StringField(3)
+  includeTrailingDelimiter = _messages.BooleanField(4)
+  maxResults = _messages.IntegerField(
+      5, variant=_messages.Variant.UINT32, default=1000)
+  pageToken = _messages.StringField(6)
+  prefix = _messages.StringField(7)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 8)
+  userProject = _messages.StringField(9)
+  versions = _messages.BooleanField(10)
+
+
+class StorageProjectsServiceAccountGetRequest(_messages.Message):
+  r"""A StorageProjectsServiceAccountGetRequest object.
+
+  Fields:
+    projectId: Project ID
+    userProject: The project to be billed for this request.
+  """
+
+  projectId = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class TestIamPermissionsResponse(_messages.Message):
+  r"""A storage.(buckets|objects).testIamPermissions response.
+
+  Fields:
+    kind: The kind of item this is.
+    permissions: The permissions held by the caller. Permissions are always of
+      the format storage.resource.capability, where resource is one of buckets
+      or objects. The supported permissions are as follows:   -
+      storage.buckets.delete - Delete bucket.   - storage.buckets.get - Read
+      bucket metadata.   - storage.buckets.getIamPolicy - Read bucket IAM
+      policy.   - storage.buckets.create - Create bucket.   -
+      storage.buckets.list - List buckets.   - storage.buckets.setIamPolicy -
+      Update bucket IAM policy.   - storage.buckets.update - Update bucket
+      metadata.   - storage.objects.delete - Delete object.   -
+      storage.objects.get - Read object data and metadata.   -
+      storage.objects.getIamPolicy - Read object IAM policy.   -
+      storage.objects.create - Create object.   - storage.objects.list - List
+      objects.   - storage.objects.setIamPolicy - Update object IAM policy.
+      - storage.objects.update - Update object metadata.
+  """
+
+  kind = _messages.StringField(1, default=u'storage#testIamPermissionsResponse')
+  permissions = _messages.StringField(2, repeated=True)
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index d9dea3efdc4..ce37688ef82 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -95,7 +95,6 @@ class SetupTest(unittest.TestCase):
             errors, ['project', 'staging_location', 'temp_location', 'region']),
         [])
 
-  @unittest.skip('Not compatible with new GCS client. See GH issue #26335.')
   def test_gcs_path(self):
     def get_validator(temp_location, staging_location):
       options = ['--project=example:example', '--job_name=job']
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 78dfc175e58..5f1d3c0c329 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -55,6 +55,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
@@ -491,19 +492,12 @@ class DataflowApplicationClient(object):
     self._root_staging_location = (
         root_staging_location or self.google_cloud_options.staging_location)
 
-    from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled
-    from google.cloud import storage
-    if _is_runner_v2_disabled(options):
-      self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION
-    else:
-      self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
+    self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
 
     if self.google_cloud_options.no_auth:
       credentials = None
-      storage_credentials = None
     else:
       credentials = get_service_credentials(options)
-      storage_credentials = credentials.get_google_auth_credentials()
 
     http_client = get_new_http()
     self._client = dataflow.DataflowV1b3(
@@ -512,10 +506,12 @@ class DataflowApplicationClient(object):
         get_credentials=(not self.google_cloud_options.no_auth),
         http=http_client,
         response_encoding=get_response_encoding())
-    if storage_credentials:
-      self._storage_client = storage.Client(credentials=storage_credentials)
-    else:
-      self._storage_client = storage.Client.create_anonymous_client()
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self.google_cloud_options.no_auth),
+        http=http_client,
+        response_encoding=get_response_encoding())
     self._sdk_image_overrides = self._get_sdk_image_overrides(options)
 
   def _get_sdk_image_overrides(self, pipeline_options):
@@ -658,8 +654,6 @@ class DataflowApplicationClient(object):
       mime_type='application/octet-stream',
       total_size=None):
     """Stages a file at a GCS or local path with stream-supplied contents."""
-    from google.cloud.exceptions import Forbidden
-    from google.cloud.exceptions import NotFound
     if not gcs_or_local_path.startswith('gs://'):
       local_path = FileSystems.join(gcs_or_local_path, file_name)
       _LOGGER.info('Staging file locally to %s', local_path)
@@ -667,36 +661,31 @@ class DataflowApplicationClient(object):
         f.write(stream.read())
       return
     gcs_location = FileSystems.join(gcs_or_local_path, file_name)
-    bucket_name, blob_name = gcs_location[5:].split('/', 1)
+    bucket, name = gcs_location[5:].split('/', 1)
+
+    request = storage.StorageObjectsInsertRequest(bucket=bucket, name=name)
     start_time = time.time()
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
+    upload = storage.Upload(stream, mime_type, total_size)
     try:
-      from google.cloud.storage import Blob
-      from google.cloud.storage.fileio import BlobWriter
-      bucket = self._storage_client.get_bucket(bucket_name)
-      blob = bucket.get_blob(blob_name)
-      if not blob:
-        blob = Blob(blob_name, bucket)
-      with BlobWriter(blob) as f:
-        f.write(stream.read())
-      return
-    except Exception as e:
-      reportable_errors = [
-          Forbidden,
-          NotFound,
-      ]
-      if type(e) in reportable_errors:
+      response = self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
         raise IOError((
             'Could not upload to GCS path %s: %s. Please verify '
-            'that credentials are valid, that the specified path '
-            'exists, and that you have write access to it.') %
-                      (gcs_or_local_path, e))
+            'that credentials are valid and that you have write '
+            'access to the specified path.') %
+                      (gcs_or_local_path, reportable_errors[e.status_code]))
       raise
-    finally:
-      _LOGGER.info(
-          'Completed GCS upload to %s in %s seconds.',
-          gcs_location,
-          int(time.time() - start_time))
+    _LOGGER.info(
+        'Completed GCS upload to %s in %s seconds.',
+        gcs_location,
+        int(time.time() - start_time))
+    return response
 
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def create_job(self, job):
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index b5fcee48c29..bdd9ab4d1a8 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -33,6 +33,8 @@ import apache_beam as beam
 from apache_beam.dataframe.convert import to_pcollection
 from apache_beam.dataframe.frame_base import DeferredBase
 from apache_beam.internal.gcp import auth
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import Pipeline
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -449,23 +451,21 @@ def assert_bucket_exists(bucket_name):
     Logs a warning if the bucket cannot be verified to exist.
   """
   try:
-    from google.cloud.exceptions import ClientError
-    from google.cloud.exceptions import NotFound
-    from google.cloud import storage
-    credentials = auth.get_service_credentials(PipelineOptions())
-    if credentials:
-      storage_client = storage.Client(credentials=credentials)
-    else:
-      storage_client = storage.Client.create_anonymous_client()
-    storage_client.get_bucket(bucket_name)
-  except ClientError as e:
-    if isinstance(e, NotFound):
+    from apitools.base.py.exceptions import HttpError
+    storage_client = storage.StorageV1(
+        credentials=auth.get_service_credentials(PipelineOptions()),
+        get_credentials=False,
+        http=get_new_http(),
+        response_encoding='utf8')
+    request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+    storage_client.buckets.Get(request)
+  except HttpError as e:
+    if e.status_code == 404:
       _LOGGER.error('%s bucket does not exist!', bucket_name)
       raise ValueError('Invalid GCS bucket provided!')
     else:
       _LOGGER.warning(
-          'ClientError - unable to verify whether bucket %s exists',
-          bucket_name)
+          'HttpError - unable to verify whether bucket %s exists', bucket_name)
   except ImportError:
     _LOGGER.warning(
         'ImportError - unable to verify whether bucket %s exists', bucket_name)
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index f3d7f96b0db..ecb71a2bdef 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -46,22 +46,27 @@ from apache_beam.utils.windowed_value import WindowedValue
 
 # Protect against environments where apitools library is not available.
 try:
-  from google.cloud.exceptions import BadRequest, NotFound
+  from apitools.base.py.exceptions import HttpError
+  from apitools.base.py.exceptions import HttpNotFoundError
 except ImportError:
   _http_error_imported = False
+  HttpError = ValueError
+  HttpNotFoundError = ValueError
 else:
   _http_error_imported = True
 
 
-class MockStorageClient():
-  def __init__(self):
-    pass
-
-  def get_bucket(self, path):
+class MockBuckets():
+  def Get(self, path):
     if path == 'test-bucket-not-found':
-      raise NotFound('Bucket not found')
+      raise HttpNotFoundError({'status': 404}, {}, '')
     elif path == 'test-bucket-not-verified':
-      raise BadRequest('Request faulty')
+      raise HttpError({'status': 400}, {}, '')
+
+
+class MockStorageClient():
+  def __init__(self, buckets=MockBuckets()):
+    self.buckets = buckets
 
 
 class Record(NamedTuple):
@@ -348,21 +353,29 @@ class GeneralUtilTest(unittest.TestCase):
     self.assertIs(getattr(main_session, name, None), value)
 
 
-@patch('google.cloud.storage.Client', return_value=MockStorageClient())
+@patch(
+    'apache_beam.io.gcp.internal.clients.storage.StorageV1',
+    return_value=MockStorageClient())
 @unittest.skipIf(not _http_error_imported, 'http errors are not imported.')
 class GCSUtilsTest(unittest.TestCase):
-  @patch('google.cloud.storage.Client.get_bucket')
+  @patch(
+      'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',
+      return_value='test-bucket-not-found')
   def test_assert_bucket_exists_not_found(self, mock_response, mock_client):
     with self.assertRaises(ValueError):
-      utils.assert_bucket_exists('test-bucket-not-found')
+      utils.assert_bucket_exists('')
 
-  @patch('google.cloud.storage.Client.get_bucket')
+  @patch(
+      'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',
+      return_value='test-bucket-not-verified')
   def test_assert_bucket_exists_not_verified(self, mock_response, mock_client):
     from apache_beam.runners.interactive.utils import _LOGGER
     with self.assertLogs(_LOGGER, level='WARNING'):
-      utils.assert_bucket_exists('test-bucket-not-verified')
+      utils.assert_bucket_exists('')
 
-  @patch('google.cloud.storage.Client.get_bucket')
+  @patch(
+      'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',
+      return_value='test-bucket-found')
   def test_assert_bucket_exists_found(self, mock_response, mock_client):
     utils.assert_bucket_exists('')
 
diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
index 0e8e0d97b46..19becd3e123 100644
--- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -41,6 +41,7 @@ from google.protobuf.json_format import MessageToJson
 from apache_beam import version as beam_version
 from apache_beam.internal.gcp.auth import get_service_credentials
 from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions  # pylint: disable=unused-import
 from apache_beam.options.pipeline_options import SetupOptions
@@ -209,12 +210,12 @@ class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
       credentials = None
     else:
       credentials = get_service_credentials(options)
-    from google.cloud import storage
-    if credentials:
-      self._storage_client = storage.Client(
-          credentials=credentials.get_google_auth_credentials())
-    else:
-      self._storage_client = storage.Client.create_anonymous_client()
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self._google_cloud_options.no_auth),
+        http=get_new_http(),
+        response_encoding='utf8')
     self._cloudbuild_client = cloudbuild.CloudbuildV1(
         credentials=credentials,
         get_credentials=(not self._google_cloud_options.no_auth),
@@ -306,23 +307,27 @@ class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
+    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+    request = storage.StorageObjectsInsertRequest(
+        bucket=gcs_bucket, name=gcs_object)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    from google.cloud import storage
-    from google.cloud.exceptions import Forbidden
-    from google.cloud.exceptions import NotFound
+    total_size = os.path.getsize(local_file_path)
+    from apitools.base.py import exceptions
     try:
-      bucket = self._storage_client.get_bucket(bucket_name)
-      blob = bucket.get_blob(blob_name)
-      if not blob:
-        blob = storage.Blob(name=blob_name, bucket=bucket)
-      blob.upload_from_filename(local_file_path)
-    except Exception as e:
-      if isinstance(e, (Forbidden, NotFound)):
+      with open(local_file_path, 'rb') as stream:
+        upload = storage.Upload(stream, 'application/octet-stream', total_size)
+        self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
         raise IOError((
             'Could not upload to GCS path %s: %s. Please verify '
             'that credentials are valid and that you have write '
-            'access to the specified path.') % (gcs_location, e.message))
+            'access to the specified path.') %
+                      (gcs_location, reportable_errors[e.status_code]))
       raise
     _LOGGER.info('Completed GCS upload to %s.', gcs_location)
 
diff --git a/sdks/python/container/py310/base_image_requirements.txt b/sdks/python/container/py310/base_image_requirements.txt
index e79616eda57..5d88dae5250 100644
--- a/sdks/python/container/py310/base_image_requirements.txt
+++ b/sdks/python/container/py310/base_image_requirements.txt
@@ -156,4 +156,4 @@ urllib3==1.26.16
 websocket-client==1.6.1
 Werkzeug==2.3.6
 wrapt==1.15.0
-zstandard==0.21.0
+zstandard==0.21.0
\ No newline at end of file
diff --git a/sdks/python/container/py311/base_image_requirements.txt b/sdks/python/container/py311/base_image_requirements.txt
index 4bb5bc597a8..e3f4b0b9393 100644
--- a/sdks/python/container/py311/base_image_requirements.txt
+++ b/sdks/python/container/py311/base_image_requirements.txt
@@ -150,4 +150,4 @@ urllib3==1.26.16
 websocket-client==1.6.1
 Werkzeug==2.3.6
 wrapt==1.15.0
-zstandard==0.21.0
+zstandard==0.21.0
\ No newline at end of file
diff --git a/sdks/python/container/py38/base_image_requirements.txt b/sdks/python/container/py38/base_image_requirements.txt
index a71accf47ff..60a423c3f37 100644
--- a/sdks/python/container/py38/base_image_requirements.txt
+++ b/sdks/python/container/py38/base_image_requirements.txt
@@ -158,4 +158,4 @@ websocket-client==1.6.1
 Werkzeug==2.3.6
 wrapt==1.15.0
 zipp==3.16.2
-zstandard==0.21.0
+zstandard==0.21.0
\ No newline at end of file
diff --git a/sdks/python/container/py39/base_image_requirements.txt b/sdks/python/container/py39/base_image_requirements.txt
index 0e046d8cc69..e65b4c8e143 100644
--- a/sdks/python/container/py39/base_image_requirements.txt
+++ b/sdks/python/container/py39/base_image_requirements.txt
@@ -158,4 +158,4 @@ websocket-client==1.6.1
 Werkzeug==2.3.6
 wrapt==1.15.0
 zipp==3.16.2
-zstandard==0.21.0
+zstandard==0.21.0
\ No newline at end of file
diff --git a/sdks/python/mypy.ini b/sdks/python/mypy.ini
index 298f249ffbf..46dea481f93 100644
--- a/sdks/python/mypy.ini
+++ b/sdks/python/mypy.ini
@@ -39,6 +39,9 @@ ignore_errors = true
 # error: Cannot infer type of lambda  [misc]
 ignore_errors = true
 
+[mypy-apache_beam.io.gcp.internal.clients.storage.storage_v1_client]
+ignore_errors = true
+
 [mypy-apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_client]
 ignore_errors = true
 
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 027c0c6dbb9..17f612b2d42 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -310,7 +310,6 @@ if __name__ == '__main__':
             'google-cloud-datastore>=2.0.0,<3',
             'google-cloud-pubsub>=2.1.0,<3',
             'google-cloud-pubsublite>=1.2.0,<2',
-            'google-cloud-storage>=2.10.0,<3',
             # GCP packages required by tests
             'google-cloud-bigquery>=2.0.0,<4',
             'google-cloud-bigquery-storage>=2.6.3,<3',