You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/07/07 11:12:48 UTC

[beam] 01/01: Revert "Replace StorageV1 client with GCS client (#25965)"

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch revert-25965-replace-storage-client
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f35d83647aeea7007d9c75ff82c0220e81e035b5
Author: Danny McCormick <da...@google.com>
AuthorDate: Fri Jul 7 07:12:41 2023 -0400

    Revert "Replace StorageV1 client with GCS client (#25965)"
    
    This reverts commit 34a17963d57cc9b7dbf8cd0225dde63c5e254925.
---
 .../assets/symbols/python.g.yaml                   |    8 +
 .../examples/complete/game/user_score.py           |    1 -
 sdks/python/apache_beam/internal/gcp/auth.py       |    7 +-
 sdks/python/apache_beam/io/gcp/bigquery_test.py    |    4 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    |   33 +-
 .../apache_beam/io/gcp/gcsfilesystem_test.py       |   17 +-
 sdks/python/apache_beam/io/gcp/gcsio.py            |  625 +++--
 .../apache_beam/io/gcp/gcsio_integration_test.py   |  149 +-
 sdks/python/apache_beam/io/gcp/gcsio_overrides.py  |   55 +
 sdks/python/apache_beam/io/gcp/gcsio_test.py       |  886 +++++--
 .../io/gcp/internal/clients/storage/__init__.py    |   33 +
 .../internal/clients/storage/storage_v1_client.py  | 1517 +++++++++++
 .../clients/storage/storage_v1_messages.py         | 2714 ++++++++++++++++++++
 .../options/pipeline_options_validator_test.py     |    1 -
 .../runners/dataflow/internal/apiclient.py         |   52 +-
 .../apache_beam/runners/interactive/utils.py       |   26 +-
 .../apache_beam/runners/interactive/utils_test.py  |   41 +-
 .../runners/portability/sdk_container_builder.py   |   41 +-
 sdks/python/mypy.ini                               |    3 +
 sdks/python/setup.py                               |    1 -
 20 files changed, 5737 insertions(+), 477 deletions(-)

diff --git a/playground/frontend/playground_components/assets/symbols/python.g.yaml b/playground/frontend/playground_components/assets/symbols/python.g.yaml
index 0b9e5e142de..a47447225a6 100644
--- a/playground/frontend/playground_components/assets/symbols/python.g.yaml
+++ b/playground/frontend/playground_components/assets/symbols/python.g.yaml
@@ -4790,6 +4790,10 @@ GBKTransform:
   - from_runner_api_parameter
   - to_runner_api_parameter
 GcpTestIOError: {}
+GcsDownloader:
+  methods:
+  - get_range
+  - size
 GCSFileSystem:
   methods:
   - checksum
@@ -4833,6 +4837,10 @@ GcsIOError: {}
 GcsIOOverrides:
   methods:
   - retry_func
+GcsUploader:
+  methods:
+  - finish
+  - put
 GeneralPurposeConsumerSet:
   methods:
   - flush
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 03f0d00fc30..564cea8c425 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -177,7 +177,6 @@ def run(argv=None, save_main_session=True):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-
     (  # pylint: disable=expression-not-assigned
         p
         | 'ReadInputText' >> beam.io.ReadFromText(args.input)
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py
index 7e54ba0a4ba..47c3416babd 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -111,9 +111,6 @@ if _GOOGLE_AUTH_AVAILABLE:
       """Delegate attribute access to underlying google-auth credentials."""
       return getattr(self._google_auth_credentials, attr)
 
-    def get_google_auth_credentials(self):
-      return self._google_auth_credentials
-
 
 class _Credentials(object):
   _credentials_lock = threading.Lock()
@@ -122,7 +119,7 @@ class _Credentials(object):
 
   @classmethod
   def get_service_credentials(cls, pipeline_options):
-    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
+    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
     with cls._credentials_lock:
       if cls._credentials_init:
         return cls._credentials
@@ -142,7 +139,7 @@ class _Credentials(object):
 
   @staticmethod
   def _get_service_credentials(pipeline_options):
-    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
+    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
     if not _GOOGLE_AUTH_AVAILABLE:
       _LOGGER.warning(
           'Unable to find default credentials because the google-auth library '
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index fe1a568f414..3a3033dfcaf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -827,7 +827,6 @@ class TestWriteToBigQuery(unittest.TestCase):
           exception_type=exceptions.ServiceUnavailable if exceptions else None,
           error_message='backendError')
   ])
-  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')
   def test_load_job_exception(self, exception_type, error_message):
 
     with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
@@ -867,7 +866,6 @@ class TestWriteToBigQuery(unittest.TestCase):
           exception_type=exceptions.InternalServerError if exceptions else None,
           error_message='internalError'),
   ])
-  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')
   def test_copy_load_job_exception(self, exception_type, error_message):
 
     from apache_beam.io.gcp import bigquery_file_loads
@@ -886,7 +884,7 @@ class TestWriteToBigQuery(unittest.TestCase):
       mock.patch.object(BigQueryWrapper,
                         'wait_for_bq_job'), \
       mock.patch('apache_beam.io.gcp.internal.clients'
-        '.storage.storage_v1_client.StorageV1.ObjectsService'),\
+        '.storage.storage_v1_client.StorageV1.ObjectsService'), \
       mock.patch('time.sleep'), \
       self.assertRaises(Exception) as exc, \
       beam.Pipeline() as p:
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index f40509493c1..c87a8499c91 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -254,13 +254,24 @@ class GCSFileSystem(FileSystem):
       gcs_batches.append(gcs_current_batch)
 
     # Execute GCS renames if any and return exceptions.
-    try:
-      for batch in gcs_batches:
-        self._gcsIO().copy_batch(batch)
-        self._gcsIO().delete_batch(source_file_names)
+    exceptions = {}
+    for batch in gcs_batches:
+      copy_statuses = self._gcsIO().copy_batch(batch)
+      copy_succeeded = []
+      for src, dest, exception in copy_statuses:
+        if exception:
+          exceptions[(src, dest)] = exception
+        else:
+          copy_succeeded.append((src, dest))
+      delete_batch = [src for src, dest in copy_succeeded]
+      delete_statuses = self._gcsIO().delete_batch(delete_batch)
+      for i, (src, exception) in enumerate(delete_statuses):
+        dest = copy_succeeded[i][1]
+        if exception:
+          exceptions[(src, dest)] = exception
 
-    except Exception as exception:
-      raise BeamIOError("Rename operation failed", exception)
+    if exceptions:
+      raise BeamIOError("Rename operation failed", exceptions)
 
   def exists(self, path):
     """Check if the provided path exists on the FileSystem.
@@ -329,7 +340,8 @@ class GCSFileSystem(FileSystem):
     """
     try:
       file_metadata = self._gcsIO()._status(path)
-      return FileMetadata(path, file_metadata['size'], file_metadata['updated'])
+      return FileMetadata(
+          path, file_metadata['size'], file_metadata['last_updated'])
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Metadata operation failed", {path: e})
 
@@ -348,7 +360,12 @@ class GCSFileSystem(FileSystem):
       else:
         path_to_use = path
       match_result = self.match([path_to_use])[0]
-      self._gcsIO().delete_batch([m.path for m in match_result.metadata_list])
+      statuses = self._gcsIO().delete_batch(
+          [m.path for m in match_result.metadata_list])
+      # pylint: disable=used-before-assignment
+      failures = [e for (_, e) in statuses if e is not None]
+      if failures:
+        raise failures[0]
 
     exceptions = {}
     for path in paths:
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index c3ca88a1643..800bd5d1c46 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -272,7 +272,8 @@ class GCSFileSystemTest(unittest.TestCase):
         'gs://bucket/to2',
         'gs://bucket/to3',
     ]
-    gcsio_mock.delete_batch.side_effect = Exception("BadThings")
+    exception = IOError('Failed')
+    gcsio_mock.delete_batch.side_effect = [[(f, exception) for f in sources]]
     gcsio_mock.copy_batch.side_effect = [[
         ('gs://bucket/from1', 'gs://bucket/to1', None),
         ('gs://bucket/from2', 'gs://bucket/to2', None),
@@ -280,8 +281,16 @@ class GCSFileSystemTest(unittest.TestCase):
     ]]
 
     # Issue batch rename.
-    with self.assertRaisesRegex(BeamIOError, r'^Rename operation failed'):
+    expected_results = {
+        (s, d): exception
+        for s, d in zip(sources, destinations)
+    }
+
+    # Issue batch rename.
+    with self.assertRaisesRegex(BeamIOError,
+                                r'^Rename operation failed') as error:
       self.fs.rename(sources, destinations)
+    self.assertEqual(error.exception.exception_details, expected_results)
 
     gcsio_mock.copy_batch.assert_called_once_with([
         ('gs://bucket/from1', 'gs://bucket/to1'),
@@ -299,7 +308,7 @@ class GCSFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
-    gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
+    gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
@@ -317,7 +326,7 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     exception = IOError('Failed')
     gcsio_mock.delete_batch.side_effect = exception
-    gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
+    gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index dea430c1dbc..541bbfd9411 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -29,18 +29,28 @@ https://github.com/apache/beam/blob/master/sdks/python/OWNERS
 
 # pytype: skip-file
 
+import errno
+import io
 import logging
+import multiprocessing
 import re
+import threading
 import time
+import traceback
+from itertools import islice
 from typing import Optional
 from typing import Union
 
-from google.cloud import storage
-from google.cloud.exceptions import NotFound
-from google.cloud.storage.fileio import BlobReader
-from google.cloud.storage.fileio import BlobWriter
-
-from apache_beam.internal.gcp import auth
+import apache_beam
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.internal.metrics.metric import ServiceCallMetric
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import PipeStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.io.gcp import resource_identifiers
+from apache_beam.metrics import monitoring_infos
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.utils import retry
 from apache_beam.utils.annotations import deprecated
@@ -49,11 +59,58 @@ __all__ = ['GcsIO']
 
 _LOGGER = logging.getLogger(__name__)
 
+# Issue a friendlier error message if the storage library is not available.
+# TODO(silviuc): Remove this guard when storage is available everywhere.
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from apitools.base.py.batch import BatchApiRequest
+  from apitools.base.py.exceptions import HttpError
+  from apitools.base.py import transfer
+  from apache_beam.internal.gcp import auth
+  from apache_beam.io.gcp.internal.clients import storage
+except ImportError:
+  raise ImportError(
+      'Google Cloud Storage I/O not supported for this execution environment '
+      '(could not import storage API client).')
+
+# This is the size of each partial-file read operation from GCS.  This
+# parameter was chosen to give good throughput while keeping memory usage at
+# a reasonable level; the following table shows throughput reached when
+# reading files of a given size with a chosen buffer size and informed the
+# choice of the value, as of 11/2016:
+#
+# +---------------+------------+-------------+-------------+-------------+
+# |               | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
+# +---------------+------------+-------------+-------------+-------------+
+# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
+# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
+# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
+# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
+# +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
+# This is the number of seconds the library will wait for a partial-file read
+# operation from GCS to complete before retrying.
+DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
+
+# This is the size of chunks used when writing to GCS.
+WRITE_CHUNK_SIZE = 8 * 1024 * 1024
+
 # Maximum number of operations permitted in GcsIO.copy_batch() and
 # GcsIO.delete_batch().
-MAX_BATCH_OPERATION_SIZE = 1000
+MAX_BATCH_OPERATION_SIZE = 100
+
+# Batch endpoint URL for GCS.
+# We have to specify an API specific endpoint here since Google APIs global
+# batch endpoints will be deprecated on 03/25/2019.
+# See https://developers.googleblog.com/2018/03/discontinuing-support-for-json-rpc-and.html.  # pylint: disable=line-too-long
+# Currently apitools library uses a global batch endpoint by default:
+# https://github.com/google/apitools/blob/master/apitools/base/py/batch.py#L152
+# TODO: remove this constant and it's usage after apitools move to using an API
+# specific batch endpoint or after Beam gcsio module start using a GCS client
+# library that does not use global batch endpoints.
+GCS_BATCH_ENDPOINT = 'https://www.googleapis.com/batch/storage/v1'
 
 
 def parse_gcs_path(gcs_path, object_optional=False):
@@ -97,21 +154,29 @@ def get_or_create_default_gcs_bucket(options):
         bucket_name, project, location=region)
 
 
+class GcsIOError(IOError, retry.PermanentException):
+  """GCS IO error that should not be retried."""
+  pass
+
+
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
+    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
     if storage_client is None:
       if not pipeline_options:
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      credentials = auth.get_service_credentials(pipeline_options)
-      if credentials:
-        storage_client = storage.Client(
-            credentials=credentials.get_google_auth_credentials())
-      else:
-        storage_client = storage.Client.create_anonymous_client()
+      storage_client = storage.StorageV1(
+          credentials=auth.get_service_credentials(pipeline_options),
+          get_credentials=False,
+          http=get_new_http(),
+          response_encoding='utf8',
+          additional_http_headers={
+              "User-Agent": "apache-beam/%s (GPN:Beam)" %
+              apache_beam.__version__
+          })
     self.client = storage_client
     self._rewrite_cb = None
     self.bucket_to_project_number = {}
@@ -121,30 +186,40 @@ class GcsIO(object):
       bucket_metadata = self.get_bucket(bucket_name=bucket)
       if bucket_metadata:
         self.bucket_to_project_number[bucket] = bucket_metadata.projectNumber
+      #  else failed to load the bucket metadata due to HttpError
 
     return self.bucket_to_project_number.get(bucket, None)
 
+  def _set_rewrite_response_callback(self, callback):
+    """For testing purposes only. No backward compatibility guarantees.
+
+    Args:
+      callback: A function that receives ``storage.RewriteResponse``.
+    """
+    self._rewrite_cb = callback
+
   def get_bucket(self, bucket_name):
     """Returns an object bucket from its name, or None if it does not exist."""
     try:
-      return self.client.lookup_bucket(bucket_name)
-    except NotFound:
+      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+      return self.client.buckets.Get(request)
+    except HttpError:
       return None
 
   def create_bucket(self, bucket_name, project, kms_key=None, location=None):
     """Create and return a GCS bucket in a specific project."""
-
+    encryption = None
+    if kms_key:
+      encryption = storage.Bucket.EncryptionValue(kms_key)
+
+    request = storage.StorageBucketsInsertRequest(
+        bucket=storage.Bucket(
+            name=bucket_name, location=location, encryption=encryption),
+        project=project,
+    )
     try:
-      bucket = self.client.create_bucket(
-          bucket_or_name=bucket_name,
-          project=project,
-          location=location,
-      )
-      if kms_key:
-        bucket.default_kms_key_name(kms_key)
-        bucket.patch()
-      return bucket
-    except NotFound:
+      return self.client.buckets.Insert(request)
+    except HttpError:
       return None
 
   def open(
@@ -167,18 +242,24 @@ class GcsIO(object):
     Raises:
       ValueError: Invalid open file mode.
     """
-    bucket_name, blob_name = parse_gcs_path(filename)
-    bucket = self.client.get_bucket(bucket_name)
-
     if mode == 'r' or mode == 'rb':
-      blob = bucket.get_blob(blob_name)
-      return BeamBlobReader(blob, chunk_size=read_buffer_size)
+      downloader = GcsDownloader(
+          self.client,
+          filename,
+          buffer_size=read_buffer_size,
+          get_project_number=self.get_project_number)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
     elif mode == 'w' or mode == 'wb':
-      blob = bucket.get_blob(blob_name)
-      if not blob:
-        blob = storage.Blob(blob_name, bucket)
-      return BeamBlobWriter(blob, mime_type)
-
+      uploader = GcsUploader(
+          self.client,
+          filename,
+          mime_type,
+          get_project_number=self.get_project_number)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
     else:
       raise ValueError('Invalid file open mode: %s.' % mode)
 
@@ -190,101 +271,186 @@ class GcsIO(object):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket_name, blob_name = parse_gcs_path(path)
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsDeleteRequest(
+        bucket=bucket, object=object_path)
     try:
-      bucket = self.client.get_bucket(bucket_name)
-      bucket.delete_blob(blob_name)
-    except NotFound:
-      return
-
+      self.client.objects.Delete(request)
+    except HttpError as http_error:
+      if http_error.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      raise
+
+  # We intentionally do not decorate this method with a retry, as retrying is
+  # handled in BatchApiRequest.Execute().
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-    """
-    final_results = []
-    s = 0
-    while s < len(paths):
-      if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
-        current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
-      else:
-        current_paths = paths[s:]
-      current_batch = self.client.batch(raise_exception=False)
-      with current_batch:
-        for path in current_paths:
-          bucket_name, blob_name = parse_gcs_path(path)
-          bucket = self.client.get_bucket(bucket_name)
-          blobs = [blob_name]
-          bucket.delete_blobs(blobs, on_error=lambda blob: None)
 
-      final_results.extend(current_batch._responses)
-
-      s += MAX_BATCH_OPERATION_SIZE
-
-    return final_results
+    Returns: List of tuples of (path, exception) in the same order as the paths
+             argument, where exception is None if the operation succeeded or
+             the relevant exception if the operation failed.
+    """
+    if not paths:
+      return []
+
+    paths = iter(paths)
+    result_statuses = []
+    while True:
+      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
+      if not paths_chunk:
+        return result_statuses
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
+          response_encoding='utf-8')
+      for path in paths_chunk:
+        bucket, object_path = parse_gcs_path(path)
+        request = storage.StorageObjectsDeleteRequest(
+            bucket=bucket, object=object_path)
+        batch_request.Add(self.client.objects, 'Delete', request)
+      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
+      for i, api_call in enumerate(api_calls):
+        path = paths_chunk[i]
+        exception = None
+        if api_call.is_error:
+          exception = api_call.exception
+          # Return success when the file doesn't exist anymore for idempotency.
+          if isinstance(exception, HttpError) and exception.status_code == 404:
+            exception = None
+        result_statuses.append((path, exception))
+    return result_statuses
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
+  def copy(
+      self,
+      src,
+      dest,
+      dest_kms_key_name=None,
+      max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite API call will return after these many bytes.
+        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket_name, src_blob_name = parse_gcs_path(src)
-    dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True)
-    src_bucket = self.get_bucket(src_bucket_name)
-    src_blob = src_bucket.get_blob(src_blob_name)
-    if not src_blob:
-      raise NotFound("Source %s not found", src)
-    dest_bucket = self.get_bucket(dest_bucket_name)
-    if not dest_blob_name:
-      dest_blob_name = None
-    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
-
-  def copy_batch(self, src_dest_pairs):
-    """Copies the given GCS objects from src to dest.
+    src_bucket, src_path = parse_gcs_path(src)
+    dest_bucket, dest_path = parse_gcs_path(dest)
+    request = storage.StorageObjectsRewriteRequest(
+        sourceBucket=src_bucket,
+        sourceObject=src_path,
+        destinationBucket=dest_bucket,
+        destinationObject=dest_path,
+        destinationKmsKeyName=dest_kms_key_name,
+        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+    response = self.client.objects.Rewrite(request)
+    while not response.done:
+      _LOGGER.debug(
+          'Rewrite progress: %d of %d bytes, %s to %s',
+          response.totalBytesRewritten,
+          response.objectSize,
+          src,
+          dest)
+      request.rewriteToken = response.rewriteToken
+      response = self.client.objects.Rewrite(request)
+      if self._rewrite_cb is not None:
+        self._rewrite_cb(response)
+
+    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
+
+  # We intentionally do not decorate this method with a retry, as retrying is
+  # handled in BatchApiRequest.Execute().
+  def copy_batch(
+      self,
+      src_dest_pairs,
+      dest_kms_key_name=None,
+      max_bytes_rewritten_per_call=None):
+    """Copies the given GCS object from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite call will return after these many bytes. Used
+        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
-    final_results = []
-    s = 0
-    while s < len(src_dest_pairs):
-      if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
-        current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
-      else:
-        current_pairs = src_dest_pairs[s:]
-      current_batch = self.client.batch(raise_exception=False)
-      with current_batch:
-        for pair in current_pairs:
-          src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
-          dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
-          src_bucket = self.client.get_bucket(src_bucket_name)
-          src_blob = src_bucket.get_blob(src_blob_name)
-          dest_bucket = self.client.get_bucket(dest_bucket_name)
-
-          src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
-
-      final_results += current_batch._responses
-      s += MAX_BATCH_OPERATION_SIZE
+    if not src_dest_pairs:
+      return []
+    pair_to_request = {}
+    for pair in src_dest_pairs:
+      src_bucket, src_path = parse_gcs_path(pair[0])
+      dest_bucket, dest_path = parse_gcs_path(pair[1])
+      request = storage.StorageObjectsRewriteRequest(
+          sourceBucket=src_bucket,
+          sourceObject=src_path,
+          destinationBucket=dest_bucket,
+          destinationObject=dest_path,
+          destinationKmsKeyName=dest_kms_key_name,
+          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+      pair_to_request[pair] = request
+    pair_to_status = {}
+    while True:
+      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
+      if not pairs_in_batch:
+        break
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
+          response_encoding='utf-8')
+      for pair in pairs_in_batch:
+        batch_request.Add(self.client.objects, 'Rewrite', pair_to_request[pair])
+      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
+      for pair, api_call in zip(pairs_in_batch, api_calls):
+        src, dest = pair
+        response = api_call.response
+        if self._rewrite_cb is not None:
+          self._rewrite_cb(response)
+        if api_call.is_error:
+          exception = api_call.exception
+          # Translate 404 to the appropriate not found exception.
+          if isinstance(exception, HttpError) and exception.status_code == 404:
+            exception = (
+                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
+          pair_to_status[pair] = exception
+        elif not response.done:
+          _LOGGER.debug(
+              'Rewrite progress: %d of %d bytes, %s to %s',
+              response.totalBytesRewritten,
+              response.objectSize,
+              src,
+              dest)
+          pair_to_request[pair].rewriteToken = response.rewriteToken
+        else:
+          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
+          pair_to_status[pair] = None
 
-    return final_results
+    return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
 
   # We intentionally do not decorate this method with a retry, since the
-  # underlying copy and delete operations are already idempotent operations.
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
   def copytree(self, src, dest):
     """Renames the given GCS "directory" recursively from src to dest.
 
@@ -299,7 +465,8 @@ class GcsIO(object):
       self.copy(entry, dest + rel_path)
 
   # We intentionally do not decorate this method with a retry, since the
-  # underlying copy and delete operations are already idempotent operations.
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
   def rename(self, src, dest):
     """Renames the given GCS object from src to dest.
 
@@ -317,10 +484,15 @@ class GcsIO(object):
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
     try:
-      self._gcs_object(path)
+      self._gcs_object(path)  # gcs object
       return True
-    except NotFound:
-      return False
+    except HttpError as http_error:
+      if http_error.status_code == 404:
+        # HTTP 404 indicates that the file did not exist
+        return False
+      else:
+        # We re-raise all other exceptions
+        raise
 
   def checksum(self, path):
     """Looks up the checksum of a GCS object.
@@ -349,7 +521,7 @@ class GcsIO(object):
     Returns: KMS key name of the GCS object as a string, or None if it doesn't
       have one.
     """
-    return self._gcs_object(path).kms_key_name
+    return self._gcs_object(path).kmsKeyName
 
   def last_updated(self, path):
     """Returns the last updated epoch time of a single GCS object.
@@ -376,10 +548,10 @@ class GcsIO(object):
     file_status = {}
     if hasattr(gcs_object, 'crc32c'):
       file_status['checksum'] = gcs_object.crc32c
-    if hasattr(gcs_object, 'kms_key_name'):
-      file_status['kms_key'] = gcs_object.kms_key_name
+    if hasattr(gcs_object, 'kmsKeyName'):
+      file_status['kms_key'] = gcs_object.kmsKeyName
     if hasattr(gcs_object, 'updated'):
-      file_status['updated'] = self._updated_to_seconds(gcs_object.updated)
+      file_status['last_updated'] = self._updated_to_seconds(gcs_object.updated)
     if hasattr(gcs_object, 'size'):
       file_status['size'] = gcs_object.size
     return file_status
@@ -394,13 +566,10 @@ class GcsIO(object):
 
     Returns: GCS object.
     """
-    bucket_name, blob_name = parse_gcs_path(path)
-    bucket = self.client.get_bucket(bucket_name)
-    blob = bucket.get_blob(blob_name)
-    if blob:
-      return blob
-    else:
-      raise NotFound('Object %s not found', path)
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsGetRequest(
+        bucket=bucket, object=object_path)
+    return self.client.objects.Get(request)
 
   @deprecated(since='2.45.0', current='list_files')
   def list_prefix(self, path, with_metadata=False):
@@ -435,7 +604,8 @@ class GcsIO(object):
       ``with_metadata`` is True: generator of
       tuple(file name, tuple(size, timestamp)).
     """
-    bucket_name, prefix = parse_gcs_path(path, object_optional=True)
+    bucket, prefix = parse_gcs_path(path, object_optional=True)
+    request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
     file_info = set()
     counter = 0
     start_time = time.time()
@@ -443,26 +613,35 @@ class GcsIO(object):
       _LOGGER.debug("Starting the file information of the input")
     else:
       _LOGGER.debug("Starting the size estimation of the input")
-    bucket = self.client.get_bucket(bucket_name)
-    response = self.client.list_blobs(bucket, prefix=prefix)
-    for item in response:
-      file_name = 'gs://%s/%s' % (item.bucket.name, item.name)
-      if file_name not in file_info:
-        file_info.add(file_name)
-        counter += 1
-        if counter % 10000 == 0:
+    while True:
+      response = retry.with_exponential_backoff(
+          retry_filter=retry.retry_on_server_errors_and_timeout_filter)(
+              self.client.objects.List)(
+                  request)
+
+      for item in response.items:
+        file_name = 'gs://%s/%s' % (item.bucket, item.name)
+        if file_name not in file_info:
+          file_info.add(file_name)
+          counter += 1
+          if counter % 10000 == 0:
+            if with_metadata:
+              _LOGGER.info(
+                  "Finished computing file information of: %s files",
+                  len(file_info))
+            else:
+              _LOGGER.info(
+                  "Finished computing size of: %s files", len(file_info))
+
           if with_metadata:
-            _LOGGER.info(
-                "Finished computing file information of: %s files",
-                len(file_info))
+            yield file_name, (item.size, self._updated_to_seconds(item.updated))
           else:
-            _LOGGER.info("Finished computing size of: %s files", len(file_info))
-
-        if with_metadata:
-          yield file_name, (item.size, self._updated_to_seconds(item.updated))
-        else:
-          yield file_name, item.size
+            yield file_name, item.size
 
+      if response.nextPageToken:
+        request.pageToken = response.nextPageToken
+      else:
+        break
     _LOGGER.log(
         # do not spam logs when list_prefix is likely used to check empty folder
         logging.INFO if counter > 0 else logging.DEBUG,
@@ -478,18 +657,174 @@ class GcsIO(object):
         updated.microsecond / 1000000.0)
 
 
-class BeamBlobWriter(BlobWriter):
-  def __init__(
-      self, blob, content_type, chunk_size=16 * 1024 * 1024, ignore_flush=True):
-    super().__init__(
-        blob,
-        content_type=content_type,
-        chunk_size=chunk_size,
-        ignore_flush=ignore_flush)
-    self.mode = "w"
+class GcsDownloader(Downloader):
+  def __init__(self, client, path, buffer_size, get_project_number):
+    self._client = client
+    self._path = path
+    self._bucket, self._name = parse_gcs_path(path)
+    self._buffer_size = buffer_size
+    self._get_project_number = get_project_number
+
+    # Create a request count metric
+    resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: self._bucket
+    }
+    project_number = self._get_project_number(self._bucket)
+    if project_number:
+      labels[monitoring_infos.GCS_PROJECT_ID_LABEL] = str(project_number)
+    else:
+      _LOGGER.debug(
+          'Possibly missing storage.buckets.get permission to '
+          'bucket %s. Label %s is not added to the counter because it '
+          'cannot be identified.',
+          self._bucket,
+          monitoring_infos.GCS_PROJECT_ID_LABEL)
+
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+
+    # Get object state.
+    self._get_request = (
+        storage.StorageObjectsGetRequest(
+            bucket=self._bucket, object=self._name))
+    try:
+      metadata = self._get_object_metadata(self._get_request)
+    except HttpError as http_error:
+      service_call_metric.call(http_error)
+      if http_error.status_code == 404:
+        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
+      else:
+        _LOGGER.error(
+            'HTTP error while requesting file %s: %s', self._path, http_error)
+        raise
+    else:
+      service_call_metric.call('ok')
 
+    self._size = metadata.size
 
-class BeamBlobReader(BlobReader):
-  def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE):
-    super().__init__(blob, chunk_size=chunk_size)
-    self.mode = "r"
+    # Ensure read is from file of the correct generation.
+    self._get_request.generation = metadata.generation
+
+    # Initialize read buffer state.
+    self._download_stream = io.BytesIO()
+    self._downloader = transfer.Download(
+        self._download_stream,
+        auto_transfer=False,
+        chunksize=self._buffer_size,
+        num_retries=20)
+
+    try:
+      self._client.objects.Get(self._get_request, download=self._downloader)
+      service_call_metric.call('ok')
+    except HttpError as e:
+      service_call_metric.call(e)
+      raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _get_object_metadata(self, get_request):
+    return self._client.objects.Get(get_request)
+
+  @property
+  def size(self):
+    return self._size
+
+  def get_range(self, start, end):
+    self._download_stream.seek(0)
+    self._download_stream.truncate(0)
+    self._downloader.GetRange(start, end - 1)
+    return self._download_stream.getvalue()
+
+
+class GcsUploader(Uploader):
+  def __init__(self, client, path, mime_type, get_project_number):
+    self._client = client
+    self._path = path
+    self._bucket, self._name = parse_gcs_path(path)
+    self._mime_type = mime_type
+    self._get_project_number = get_project_number
+
+    # Set up communication with child thread.
+    parent_conn, child_conn = multiprocessing.Pipe()
+    self._child_conn = child_conn
+    self._conn = parent_conn
+
+    # Set up uploader.
+    self._insert_request = (
+        storage.StorageObjectsInsertRequest(
+            bucket=self._bucket, name=self._name))
+    self._upload = transfer.Upload(
+        PipeStream(self._child_conn),
+        self._mime_type,
+        chunksize=WRITE_CHUNK_SIZE)
+    self._upload.strategy = transfer.RESUMABLE_UPLOAD
+
+    # Start uploading thread.
+    self._upload_thread = threading.Thread(target=self._start_upload)
+    self._upload_thread.daemon = True
+    self._upload_thread.last_error = None
+    self._upload_thread.start()
+
+  # TODO(silviuc): Refactor so that retry logic can be applied.
+  # There is retry logic in the underlying transfer library but we should make
+  # it more explicit so we can control the retry parameters.
+  @retry.no_retries  # Using no_retries marks this as an integration point.
+  def _start_upload(self):
+    # This starts the uploader thread.  We are forced to run the uploader in
+    # another thread because the apitools uploader insists on taking a stream
+    # as input. Happily, this also means we get asynchronous I/O to GCS.
+    #
+    # The uploader by default transfers data in chunks of 1024 * 1024 bytes at
+    # a time, buffering writes until that size is reached.
+
+    project_number = self._get_project_number(self._bucket)
+
+    # Create a request count metric
+    resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.insert',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: self._bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(project_number)
+    }
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+    try:
+      self._client.objects.Insert(self._insert_request, upload=self._upload)
+      service_call_metric.call('ok')
+    except Exception as e:  # pylint: disable=broad-except
+      service_call_metric.call(e)
+      _LOGGER.error(
+          'Error in _start_upload while inserting file %s: %s',
+          self._path,
+          traceback.format_exc())
+      self._upload_thread.last_error = e
+    finally:
+      self._child_conn.close()
+
+  def put(self, data):
+    try:
+      self._conn.send_bytes(data.tobytes())
+    except EOFError:
+      if self._upload_thread.last_error is not None:
+        raise self._upload_thread.last_error  # pylint: disable=raising-bad-type
+      raise
+
+  def finish(self):
+    self._conn.close()
+    # TODO(udim): Add timeout=DEFAULT_HTTP_TIMEOUT_SECONDS * 2 and raise if
+    # isAlive is True.
+    self._upload_thread.join()
+    # Check for exception since the last put() call.
+    if self._upload_thread.last_error is not None:
+      raise type(self._upload_thread.last_error)(
+          "Error while uploading file %s: %s",
+          self._path,
+          self._upload_thread.last_error.message)  # pylint: disable=raising-bad-type
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
index aabbdfd5c53..4ffbea0ba02 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
@@ -20,6 +20,21 @@
 Runs tests against Google Cloud Storage service.
 Instantiates a TestPipeline to get options such as GCP project name, but
 doesn't actually start a Beam pipeline or test any specific runner.
+
+Options:
+  --kms_key_name=projects/<project-name>/locations/<region>/keyRings/\
+      <key-ring-name>/cryptoKeys/<key-name>/cryptoKeyVersions/<version>
+    Pass a Cloud KMS key name to test GCS operations using customer managed
+    encryption keys (CMEK).
+
+Cloud KMS permissions:
+The project's Cloud Storage service account requires Encrypter/Decrypter
+permissions for the key specified in --kms_key_name.
+
+To run these tests manually:
+  ./gradlew :sdks:python:test-suites:dataflow:integrationTest \
+    -Dtests=apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest \
+    -DkmsKeyName=KMS_KEY_NAME
 """
 
 # pytype: skip-file
@@ -43,6 +58,9 @@ except ImportError:
 class GcsIOIntegrationTest(unittest.TestCase):
 
   INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt'
+  # Larger than 1MB to test maxBytesRewrittenPerCall.
+  # Also needs to be in a different region than the dest to take effect.
+  INPUT_FILE_LARGE = 'gs://apache-beam-samples-us-east1/wikipedia_edits/wiki_data-000000000000.json'  # pylint: disable=line-too-long
 
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
@@ -56,55 +74,126 @@ class GcsIOIntegrationTest(unittest.TestCase):
     self.gcs_tempdir = (
         self.test_pipeline.get_option('temp_location') + '/gcs_it-' +
         str(uuid.uuid4()))
+    self.kms_key_name = self.test_pipeline.get_option('kms_key_name')
     self.gcsio = gcsio.GcsIO()
 
   def tearDown(self):
     FileSystems.delete([self.gcs_tempdir + '/'])
 
-  def _verify_copy(self, src, dest, dest_kms_key_name=None):
-    self.assertTrue(
-        FileSystems.exists(src), 'source file does not exist: %s' % src)
-    self.assertTrue(
-        FileSystems.exists(dest),
-        'copied file not present in destination: %s' % dest)
+  def _verify_copy(self, src, dst, dst_kms_key_name=None):
+    self.assertTrue(FileSystems.exists(src), 'src does not exist: %s' % src)
+    self.assertTrue(FileSystems.exists(dst), 'dst does not exist: %s' % dst)
     src_checksum = self.gcsio.checksum(src)
-    dest_checksum = self.gcsio.checksum(dest)
-    self.assertEqual(src_checksum, dest_checksum)
-    actual_dest_kms_key = self.gcsio.kms_key(dest)
-    if actual_dest_kms_key is None:
-      self.assertEqual(actual_dest_kms_key, dest_kms_key_name)
+    dst_checksum = self.gcsio.checksum(dst)
+    self.assertEqual(src_checksum, dst_checksum)
+    actual_dst_kms_key = self.gcsio.kms_key(dst)
+    if actual_dst_kms_key is None:
+      self.assertEqual(actual_dst_kms_key, dst_kms_key_name)
     else:
       self.assertTrue(
-          actual_dest_kms_key.startswith(dest_kms_key_name),
+          actual_dst_kms_key.startswith(dst_kms_key_name),
           "got: %s, wanted startswith: %s" %
-          (actual_dest_kms_key, dest_kms_key_name))
+          (actual_dst_kms_key, dst_kms_key_name))
+
+  def _test_copy(
+      self,
+      name,
+      kms_key_name=None,
+      max_bytes_rewritten_per_call=None,
+      src=None):
+    src = src or self.INPUT_FILE
+    dst = self.gcs_tempdir + '/%s' % name
+    extra_kwargs = {}
+    if max_bytes_rewritten_per_call is not None:
+      extra_kwargs['max_bytes_rewritten_per_call'] = (
+          max_bytes_rewritten_per_call)
+
+    self.gcsio.copy(src, dst, kms_key_name, **extra_kwargs)
+    self._verify_copy(src, dst, kms_key_name)
 
   @pytest.mark.it_postcommit
   def test_copy(self):
-    src = self.INPUT_FILE
-    dest = self.gcs_tempdir + '/test_copy'
+    self._test_copy("test_copy")
 
-    self.gcsio.copy(src, dest)
-    self._verify_copy(src, dest)
+  @pytest.mark.it_postcommit
+  def test_copy_kms(self):
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+    self._test_copy("test_copy_kms", self.kms_key_name)
 
   @pytest.mark.it_postcommit
-  def test_batch_copy_and_delete(self):
+  def test_copy_rewrite_token(self):
+    # Tests a multi-part copy (rewrite) operation. This is triggered by a
+    # combination of 3 conditions:
+    #  - a large enough src
+    #  - setting max_bytes_rewritten_per_call
+    #  - setting kms_key_name
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+
+    rewrite_responses = []
+    self.gcsio._set_rewrite_response_callback(
+        lambda response: rewrite_responses.append(response))
+    self._test_copy(
+        "test_copy_rewrite_token",
+        kms_key_name=self.kms_key_name,
+        max_bytes_rewritten_per_call=50 * 1024 * 1024,
+        src=self.INPUT_FILE_LARGE)
+    # Verify that there was a multi-part rewrite.
+    self.assertTrue(any(not r.done for r in rewrite_responses))
+
+  def _test_copy_batch(
+      self,
+      name,
+      kms_key_name=None,
+      max_bytes_rewritten_per_call=None,
+      src=None):
     num_copies = 10
-    srcs = [self.INPUT_FILE] * num_copies
-    dests = [
-        self.gcs_tempdir + '/test_copy_batch_%d' % i for i in range(num_copies)
-    ]
-    src_dest_pairs = list(zip(srcs, dests))
+    srcs = [src or self.INPUT_FILE] * num_copies
+    dsts = [self.gcs_tempdir + '/%s_%d' % (name, i) for i in range(num_copies)]
+    src_dst_pairs = list(zip(srcs, dsts))
+    extra_kwargs = {}
+    if max_bytes_rewritten_per_call is not None:
+      extra_kwargs['max_bytes_rewritten_per_call'] = (
+          max_bytes_rewritten_per_call)
+
+    result_statuses = self.gcsio.copy_batch(
+        src_dst_pairs, kms_key_name, **extra_kwargs)
+    for status in result_statuses:
+      self.assertIsNone(status[2], status)
+    for _src, _dst in src_dst_pairs:
+      self._verify_copy(_src, _dst, kms_key_name)
 
-    self.gcsio.copy_batch(src_dest_pairs)
+  @pytest.mark.it_postcommit
+  def test_copy_batch(self):
+    self._test_copy_batch("test_copy_batch")
 
-    for src, dest in src_dest_pairs:
-      self._verify_copy(src, dest)
+  @pytest.mark.it_postcommit
+  def test_copy_batch_kms(self):
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+    self._test_copy_batch("test_copy_batch_kms", self.kms_key_name)
 
-    self.gcsio.delete_batch(dests)
-    for dest in dests:
-      self.assertFalse(
-          FileSystems.exists(dest), 'deleted file still exists: %s' % dest)
+  @pytest.mark.it_postcommit
+  def test_copy_batch_rewrite_token(self):
+    # Tests a multi-part copy (rewrite) operation. This is triggered by a
+    # combination of 3 conditions:
+    #  - a large enough src
+    #  - setting max_bytes_rewritten_per_call
+    #  - setting kms_key_name
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+
+    rewrite_responses = []
+    self.gcsio._set_rewrite_response_callback(
+        lambda response: rewrite_responses.append(response))
+    self._test_copy_batch(
+        "test_copy_batch_rewrite_token",
+        kms_key_name=self.kms_key_name,
+        max_bytes_rewritten_per_call=50 * 1024 * 1024,
+        src=self.INPUT_FILE_LARGE)
+    # Verify that there was a multi-part rewrite.
+    self.assertTrue(any(not r.done for r in rewrite_responses))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_overrides.py b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
new file mode 100644
index 00000000000..fc06cb28f1a
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+import math
+import time
+
+from apache_beam.metrics.metric import Metrics
+from apitools.base.py import exceptions
+from apitools.base.py import http_wrapper
+from apitools.base.py import util
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class GcsIOOverrides(object):
+  """Functions for overriding Google Cloud Storage I/O client."""
+
+  _THROTTLED_SECS = Metrics.counter('StorageV1', "cumulativeThrottlingSeconds")
+
+  @classmethod
+  def retry_func(cls, retry_args):
+    # handling GCS download throttling errors (BEAM-7424)
+    if (isinstance(retry_args.exc, exceptions.BadStatusCodeError) and
+        retry_args.exc.status_code == http_wrapper.TOO_MANY_REQUESTS):
+      _LOGGER.debug(
+          'Caught GCS quota error (%s), retrying.', retry_args.exc.status_code)
+    else:
+      return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
+
+    http_wrapper.RebuildHttpConnections(retry_args.http)
+    _LOGGER.debug(
+        'Retrying request to url %s after exception %s',
+        retry_args.http_request.url,
+        retry_args.exc)
+    sleep_seconds = util.CalculateWaitForRetry(
+        retry_args.num_retries, max_wait=retry_args.max_retry_wait)
+    cls._THROTTLED_SECS.inc(math.ceil(sleep_seconds))
+    time.sleep(sleep_seconds)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index f8b580c91c9..9cc5a9e1df0 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -18,147 +18,257 @@
 """Tests for Google Cloud Storage client."""
 # pytype: skip-file
 
+import datetime
+import errno
+import io
 import logging
 import os
+import random
+import time
 import unittest
-from datetime import datetime
+from email.message import Message
 
+import httplib2
 import mock
 
+# Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
+import apache_beam
+from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.metricbase import MetricName
 
 try:
-  from apache_beam.io.gcp import gcsio
-  from google.cloud.exceptions import BadRequest, NotFound
+  from apache_beam.io.gcp import gcsio, resource_identifiers
+  from apache_beam.io.gcp.internal.clients import storage
+  from apitools.base.py.exceptions import HttpError
 except ImportError:
-  NotFound = None
+  HttpError = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
 DEFAULT_GCP_PROJECT = 'apache-beam-testing'
+DEFAULT_PROJECT_NUMBER = 1
 
 
 class FakeGcsClient(object):
-  # Fake storage client.
+  # Fake storage client.  Usage in gcsio.py is client.objects.Get(...) and
+  # client.objects.Insert(...).
 
   def __init__(self):
-    self.buckets = {}
+    self.objects = FakeGcsObjects()
+    self.buckets = FakeGcsBuckets()
+    # Referenced in GcsIO.copy_batch() and GcsIO.delete_batch().
+    self._http = object()
 
-  def create_bucket(self, name):
-    self.buckets[name] = FakeBucket(self, name)
-    return self.buckets[name]
 
-  def get_bucket(self, name):
-    if name in self.buckets:
-      return self.buckets[name]
-    else:
-      raise NotFound("Bucket not found")
+class FakeFile(object):
+  def __init__(
+      self, bucket, obj, contents, generation, crc32c=None, last_updated=None):
+    self.bucket = bucket
+    self.object = obj
+    self.contents = contents
+    self.generation = generation
+    self.crc32c = crc32c
+    self.last_updated = last_updated
+
+  def get_metadata(self):
+    last_updated_datetime = None
+    if self.last_updated:
+      last_updated_datetime = datetime.datetime.utcfromtimestamp(
+          self.last_updated)
+
+    return storage.Object(
+        bucket=self.bucket,
+        name=self.object,
+        generation=self.generation,
+        size=len(self.contents),
+        crc32c=self.crc32c,
+        updated=last_updated_datetime)
 
-  def lookup_bucket(self, name):
-    if name in self.buckets:
-      return self.buckets[name]
-    else:
-      return self.create_bucket(name)
 
-  def batch(self):
+class FakeGcsBuckets(object):
+  def __init__(self):
     pass
 
-  def add_file(self, bucket, blob, contents):
-    folder = self.lookup_bucket(bucket)
-    holder = folder.lookup_blob(blob)
-    holder.contents = contents
+  def get_bucket(self, bucket):
+    return storage.Bucket(name=bucket, projectNumber=DEFAULT_PROJECT_NUMBER)
+
+  def Get(self, get_request):
+    return self.get_bucket(get_request.bucket)
 
-  def get_file(self, bucket, blob):
-    folder = self.get_bucket(bucket.name)
-    holder = folder.get_blob(blob.name)
-    return holder
 
-  def list_blobs(self, bucket_or_path, prefix=None):
-    bucket = self.get_bucket(bucket_or_path.name)
-    if not prefix:
-      return list(bucket.blobs.values())
+class FakeGcsObjects(object):
+  def __init__(self):
+    self.files = {}
+    # Store the last generation used for a given object name.  Note that this
+    # has to persist even past the deletion of the object.
+    self.last_generation = {}
+    self.list_page_tokens = {}
+    self._fail_when_getting_metadata = []
+    self._fail_when_reading = []
+
+  def add_file(
+      self, f, fail_when_getting_metadata=False, fail_when_reading=False):
+    self.files[(f.bucket, f.object)] = f
+    self.last_generation[(f.bucket, f.object)] = f.generation
+    if fail_when_getting_metadata:
+      self._fail_when_getting_metadata.append(f)
+    if fail_when_reading:
+      self._fail_when_reading.append(f)
+
+  def get_file(self, bucket, obj):
+    return self.files.get((bucket, obj), None)
+
+  def delete_file(self, bucket, obj):
+    del self.files[(bucket, obj)]
+
+  def get_last_generation(self, bucket, obj):
+    return self.last_generation.get((bucket, obj), 0)
+
+  def Get(self, get_request, download=None):  # pylint: disable=invalid-name
+    f = self.get_file(get_request.bucket, get_request.object)
+    if f is None:
+      # Failing with an HTTP 404 if file does not exist.
+      raise HttpError({'status': 404}, None, None)
+    if download is None:
+      if f in self._fail_when_getting_metadata:
+        raise HttpError({'status': 429}, None, None)
+      return f.get_metadata()
     else:
-      output = []
-      for name in list(bucket.blobs):
-        if name[0:len(prefix)] == prefix:
-          output.append(bucket.blobs[name])
-      return output
-
-
-class FakeBucket(object):
-  #Fake bucket for storing test blobs locally.
-
-  def __init__(self, client, name):
-    self.client = client
-    self.name = name
-    self.blobs = {}
-    self.default_kms_key_name = None
-    self.client.buckets[name] = self
-
-  def add_blob(self, blob):
-    self.blobs[blob.name] = blob
-
-  def create_blob(self, name):
-    return FakeBlob(name, self)
-
-  def copy_blob(self, blob, dest, new_name=None):
-    if not new_name:
-      new_name = blob.name
-    dest.blobs[new_name] = blob
-    dest.blobs[new_name].name = new_name
-    dest.blobs[new_name].bucket = dest
-    return dest.blobs[new_name]
-
-  def get_blob(self, blob_name):
-    if blob_name in self.blobs:
-      return self.blobs[blob_name]
+      if f in self._fail_when_reading:
+        raise HttpError({'status': 429}, None, None)
+      stream = download.stream
+
+      def get_range_callback(start, end):
+        if not 0 <= start <= end < len(f.contents):
+          raise ValueError(
+              'start=%d end=%d len=%s' % (start, end, len(f.contents)))
+        stream.write(f.contents[start:end + 1])
+
+      download.GetRange = get_range_callback
+
+  def Insert(self, insert_request, upload=None):  # pylint: disable=invalid-name
+    assert upload is not None
+    generation = self.get_last_generation(
+        insert_request.bucket, insert_request.name) + 1
+    f = FakeFile(insert_request.bucket, insert_request.name, b'', generation)
+
+    # Stream data into file.
+    stream = upload.stream
+    data_list = []
+    while True:
+      data = stream.read(1024 * 1024)
+      if not data:
+        break
+      data_list.append(data)
+    f.contents = b''.join(data_list)
+
+    self.add_file(f)
+
+  REWRITE_TOKEN = 'test_token'
+
+  def Rewrite(self, rewrite_request):  # pylint: disable=invalid-name
+    if rewrite_request.rewriteToken == self.REWRITE_TOKEN:
+      dest_object = storage.Object()
+      return storage.RewriteResponse(
+          done=True,
+          objectSize=100,
+          resource=dest_object,
+          totalBytesRewritten=100)
+
+    src_file = self.get_file(
+        rewrite_request.sourceBucket, rewrite_request.sourceObject)
+    if not src_file:
+      raise HttpError(
+          httplib2.Response({'status': '404'}),
+          '404 Not Found',
+          'https://fake/url')
+    generation = self.get_last_generation(
+        rewrite_request.destinationBucket,
+        rewrite_request.destinationObject) + 1
+    dest_file = FakeFile(
+        rewrite_request.destinationBucket,
+        rewrite_request.destinationObject,
+        src_file.contents,
+        generation)
+    self.add_file(dest_file)
+    time.sleep(10)  # time.sleep and time.time are mocked below.
+    return storage.RewriteResponse(
+        done=False,
+        objectSize=100,
+        rewriteToken=self.REWRITE_TOKEN,
+        totalBytesRewritten=5)
+
+  def Delete(self, delete_request):  # pylint: disable=invalid-name
+    # Here, we emulate the behavior of the GCS service in raising a 404 error
+    # if this object already exists.
+    if self.get_file(delete_request.bucket, delete_request.object):
+      self.delete_file(delete_request.bucket, delete_request.object)
     else:
-      return None
-
-  def lookup_blob(self, name):
-    if name in self.blobs:
-      return self.blobs[name]
+      raise HttpError(
+          httplib2.Response({'status': '404'}),
+          '404 Not Found',
+          'https://fake/url')
+
+  def List(self, list_request):  # pylint: disable=invalid-name
+    bucket = list_request.bucket
+    prefix = list_request.prefix or ''
+    matching_files = []
+    for file_bucket, file_name in sorted(iter(self.files)):
+      if bucket == file_bucket and file_name.startswith(prefix):
+        file_object = self.files[(file_bucket, file_name)].get_metadata()
+        matching_files.append(file_object)
+
+    # Handle pagination.
+    items_per_page = 5
+    if not list_request.pageToken:
+      range_start = 0
     else:
-      return self.create_blob(name)
-
-  def set_default_kms_key_name(self, name):
-    self.default_kms_key_name = name
-
-  def delete_blob(self, name):
-    if name in self.blobs:
-      del self.blobs[name]
-
-
-class FakeBlob(object):
-  def __init__(
-      self,
-      name,
-      bucket,
-      size=0,
-      contents=None,
-      generation=1,
-      crc32c=None,
-      kms_key_name=None,
-      updated=None,
-      fail_when_getting_metadata=False,
-      fail_when_reading=False):
-    self.name = name
-    self.bucket = bucket
-    self.size = size
-    self.contents = contents
-    self._generation = generation
-    self.crc32c = crc32c
-    self.kms_key_name = kms_key_name
-    self.updated = updated
-    self._fail_when_getting_metadata = fail_when_getting_metadata
-    self._fail_when_reading = fail_when_reading
-    self.bucket.add_blob(self)
-
-  def delete(self):
-    if self.name in self.bucket.blobs:
-      del self.bucket.blobs[self.name]
-
-
-@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
+      if list_request.pageToken not in self.list_page_tokens:
+        raise ValueError('Invalid page token.')
+      range_start = self.list_page_tokens[list_request.pageToken]
+      del self.list_page_tokens[list_request.pageToken]
+
+    result = storage.Objects(
+        items=matching_files[range_start:range_start + items_per_page])
+    if range_start + items_per_page < len(matching_files):
+      next_range_start = range_start + items_per_page
+      next_page_token = '_page_token_%s_%s_%d' % (
+          bucket, prefix, next_range_start)
+      self.list_page_tokens[next_page_token] = next_range_start
+      result.nextPageToken = next_page_token
+    return result
+
+
+class FakeApiCall(object):
+  def __init__(self, exception, response):
+    self.exception = exception
+    self.is_error = exception is not None
+    # Response for Rewrite:
+    self.response = response
+
+
+class FakeBatchApiRequest(object):
+  def __init__(self, **unused_kwargs):
+    self.operations = []
+
+  def Add(self, service, method, request):  # pylint: disable=invalid-name
+    self.operations.append((service, method, request))
+
+  def Execute(self, unused_http, **unused_kwargs):  # pylint: disable=invalid-name
+    api_calls = []
+    for service, method, request in self.operations:
+      exception = None
+      response = None
+      try:
+        response = getattr(service, method)(request)
+      except Exception as e:  # pylint: disable=broad-except
+        exception = e
+      api_calls.append(FakeApiCall(exception, response))
+    return api_calls
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestGCSPathParser(unittest.TestCase):
 
   BAD_GCS_PATHS = [
@@ -200,36 +310,34 @@ class SampleOptions(object):
     self.dataflow_kms_key = kms_key
 
 
-@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+@mock.patch.multiple(
+    'time', time=mock.MagicMock(side_effect=range(100)), sleep=mock.MagicMock())
 class TestGCSIO(unittest.TestCase):
   def _insert_random_file(
       self,
       client,
       path,
-      size=0,
+      size,
+      generation=1,
       crc32c=None,
-      kms_key_name=None,
-      updated=None,
+      last_updated=None,
       fail_when_getting_metadata=False,
       fail_when_reading=False):
-    bucket_name, blob_name = gcsio.parse_gcs_path(path)
-    bucket = client.lookup_bucket(bucket_name)
-    blob = FakeBlob(
-        blob_name,
+    bucket, name = gcsio.parse_gcs_path(path)
+    f = FakeFile(
         bucket,
-        size=size,
-        contents=os.urandom(size),
+        name,
+        os.urandom(size),
+        generation,
         crc32c=crc32c,
-        kms_key_name=kms_key_name,
-        updated=updated,
-        fail_when_getting_metadata=fail_when_getting_metadata,
-        fail_when_reading=fail_when_reading)
-    return blob
+        last_updated=last_updated)
+    client.objects.add_file(f, fail_when_getting_metadata, fail_when_reading)
+    return f
 
   def setUp(self):
     self.client = FakeGcsClient()
     self.gcs = gcsio.GcsIO(self.client)
-    self.client.create_bucket("gcsio-test")
 
   def test_default_bucket_name(self):
     self.assertEqual(
@@ -243,6 +351,16 @@ class TestGCSIO(unittest.TestCase):
                 DEFAULT_GCP_PROJECT, "us-central1", kms_key="kmskey!")),
         None)
 
+  def test_num_retries(self):
+    # BEAM-7424: update num_retries accordingly if storage_client is
+    # regenerated.
+    self.assertEqual(gcsio.GcsIO().client.num_retries, 20)
+
+  def test_retry_func(self):
+    # BEAM-7667: update retry_func accordingly if storage_client is
+    # regenerated.
+    self.assertIsNotNone(gcsio.GcsIO().client.retry_func)
+
   def test_exists(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
@@ -250,16 +368,17 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(self.gcs.exists(file_name + 'xyz'))
     self.assertTrue(self.gcs.exists(file_name))
 
-  @mock.patch.object(FakeBucket, 'get_blob')
+  @mock.patch.object(FakeGcsObjects, 'Get')
   def test_exists_failure(self, mock_get):
     # Raising an error other than 404. Raising 404 is a valid failure for
     # exists() call.
-    mock_get.side_effect = BadRequest("Try again")
+    mock_get.side_effect = HttpError({'status': 400}, None, None)
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
     self._insert_random_file(self.client, file_name, file_size)
-    with self.assertRaises(BadRequest):
+    with self.assertRaises(HttpError) as cm:
       self.gcs.exists(file_name)
+    self.assertEqual(400, cm.exception.status_code)
 
   def test_checksum(self):
     file_name = 'gs://gcsio-test/dummy_file'
@@ -277,100 +396,179 @@ class TestGCSIO(unittest.TestCase):
     self.assertTrue(self.gcs.exists(file_name))
     self.assertEqual(1234, self.gcs.size(file_name))
 
-  def test_kms_key(self):
-    file_name = 'gs://gcsio-test/dummy_file'
-    file_size = 1234
-    kms_key_name = "dummy"
-
-    self._insert_random_file(
-        self.client, file_name, file_size, kms_key_name=kms_key_name)
-    self.assertTrue(self.gcs.exists(file_name))
-    self.assertEqual(kms_key_name, self.gcs.kms_key(file_name))
-
   def test_last_updated(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
-    updated = datetime.fromtimestamp(123456.78)
+    last_updated = 123456.78
 
-    self._insert_random_file(self.client, file_name, file_size, updated=updated)
+    self._insert_random_file(
+        self.client, file_name, file_size, last_updated=last_updated)
     self.assertTrue(self.gcs.exists(file_name))
-    self.assertEqual(
-        gcsio.GcsIO._updated_to_seconds(updated),
-        self.gcs.last_updated(file_name))
+    self.assertEqual(last_updated, self.gcs.last_updated(file_name))
 
   def test_file_status(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
-    updated = datetime.fromtimestamp(123456.78)
+    last_updated = 123456.78
     checksum = 'deadbeef'
 
     self._insert_random_file(
-        self.client, file_name, file_size, updated=updated, crc32c=checksum)
+        self.client,
+        file_name,
+        file_size,
+        last_updated=last_updated,
+        crc32c=checksum)
     file_checksum = self.gcs.checksum(file_name)
 
     file_status = self.gcs._status(file_name)
 
     self.assertEqual(file_status['size'], file_size)
     self.assertEqual(file_status['checksum'], file_checksum)
-    self.assertEqual(
-        file_status['updated'], gcsio.GcsIO._updated_to_seconds(updated))
+    self.assertEqual(file_status['last_updated'], last_updated)
 
-  def test_file_mode_calls(self):
+  def test_file_mode(self):
     file_name = 'gs://gcsio-test/dummy_mode_file'
-    self._insert_random_file(self.client, file_name)
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobWriter') as writer:
-      self.gcs.open(file_name, 'wb')
-      writer.assert_called()
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader:
-      self.gcs.open(file_name, 'rb')
-      reader.assert_called()
+    with self.gcs.open(file_name, 'wb') as f:
+      assert f.mode == 'wb'
+    with self.gcs.open(file_name, 'rb') as f:
+      assert f.mode == 'rb'
 
   def test_bad_file_modes(self):
     file_name = 'gs://gcsio-test/dummy_mode_file'
-    self._insert_random_file(self.client, file_name)
     with self.assertRaises(ValueError):
       self.gcs.open(file_name, 'w+')
     with self.assertRaises(ValueError):
       self.gcs.open(file_name, 'r+b')
 
+  def test_empty_batches(self):
+    self.assertEqual([], self.gcs.copy_batch([]))
+    self.assertEqual([], self.gcs.delete_batch([]))
+
   def test_delete(self):
     file_name = 'gs://gcsio-test/delete_me'
     file_size = 1024
-    bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
+
     # Test deletion of non-existent file.
-    bucket = self.client.get_bucket(bucket_name)
     self.gcs.delete(file_name)
 
     self._insert_random_file(self.client, file_name, file_size)
-    self.assertTrue(blob_name in bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
     self.gcs.delete(file_name)
 
-    self.assertFalse(blob_name in bucket.blobs)
+    self.assertFalse(
+        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
+
+  @mock.patch(
+      'apache_beam.io.gcp.gcsio.auth.get_service_credentials',
+      wraps=lambda pipeline_options: None)
+  @mock.patch('apache_beam.io.gcp.gcsio.get_new_http')
+  def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock):
+    client = gcsio.GcsIO()
+    try:
+      client.get_bucket('mabucket')
+    except:  # pylint: disable=bare-except
+      # Ignore errors. The errors come from the fact that we did not mock
+      # the response from the API, so the overall get_bucket call fails
+      # soon after the GCS API is called.
+      pass
+    call = get_new_http_mock.return_value.request.mock_calls[-2]
+    self.assertIn(
+        "apache-beam/%s (GPN:Beam)" % apache_beam.__version__,
+        call[2]['headers']['User-Agent'])
+
+  @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
+  def test_delete_batch(self, *unused_args):
+    gcsio.BatchApiRequest = FakeBatchApiRequest
+    file_name_pattern = 'gs://gcsio-test/delete_me_%d'
+    file_size = 1024
+    num_files = 10
+
+    # Test deletion of non-existent files.
+    result = self.gcs.delete_batch(
+        [file_name_pattern % i for i in range(num_files)])
+    self.assertTrue(result)
+    for i, (file_name, exception) in enumerate(result):
+      self.assertEqual(file_name, file_name_pattern % i)
+      self.assertEqual(exception, None)
+      self.assertFalse(self.gcs.exists(file_name_pattern % i))
+
+    # Insert some files.
+    for i in range(num_files):
+      self._insert_random_file(self.client, file_name_pattern % i, file_size)
+
+    # Check files inserted properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(file_name_pattern % i))
+
+    # Execute batch delete.
+    self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)])
+
+    # Check files deleted properly.
+    for i in range(num_files):
+      self.assertFalse(self.gcs.exists(file_name_pattern % i))
 
   def test_copy(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
-    src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-    dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
-    src_bucket = self.client.lookup_bucket(src_bucket_name)
-    dest_bucket = self.client.lookup_bucket(dest_bucket_name)
     file_size = 1024
     self._insert_random_file(self.client, src_file_name, file_size)
-    self.assertTrue(src_blob_name in src_bucket.blobs)
-    self.assertFalse(dest_blob_name in dest_bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
-    self.gcs.copy(src_file_name, dest_file_name)
+    self.gcs.copy(src_file_name, dest_file_name, dest_kms_key_name='kms_key')
 
-    self.assertTrue(src_blob_name in src_bucket.blobs)
-    self.assertTrue(dest_blob_name in dest_bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     # Test copy of non-existent files.
-    with self.assertRaises(NotFound):
+    with self.assertRaisesRegex(HttpError, r'Not Found'):
       self.gcs.copy(
           'gs://gcsio-test/non-existent',
           'gs://gcsio-test/non-existent-destination')
 
+  @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
+  def test_copy_batch(self, *unused_args):
+    gcsio.BatchApiRequest = FakeBatchApiRequest
+    from_name_pattern = 'gs://gcsio-test/copy_me_%d'
+    to_name_pattern = 'gs://gcsio-test/destination_%d'
+    file_size = 1024
+    num_files = 10
+
+    result = self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
+                                  for i in range(num_files)],
+                                 dest_kms_key_name='kms_key')
+    self.assertTrue(result)
+    for i, (src, dest, exception) in enumerate(result):
+      self.assertEqual(src, from_name_pattern % i)
+      self.assertEqual(dest, to_name_pattern % i)
+      self.assertTrue(isinstance(exception, IOError))
+      self.assertEqual(exception.errno, errno.ENOENT)
+      self.assertFalse(self.gcs.exists(from_name_pattern % i))
+      self.assertFalse(self.gcs.exists(to_name_pattern % i))
+
+    # Insert some files.
+    for i in range(num_files):
+      self._insert_random_file(self.client, from_name_pattern % i, file_size)
+
+    # Check files inserted properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(from_name_pattern % i))
+
+    # Execute batch copy.
+    self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
+                         for i in range(num_files)])
+
+    # Check files copied properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(from_name_pattern % i))
+      self.assertTrue(self.gcs.exists(to_name_pattern % i))
+
   def test_copytree(self):
     src_dir_name = 'gs://gcsio-test/source/'
     dest_dir_name = 'gs://gcsio-test/dest/'
@@ -379,62 +577,204 @@ class TestGCSIO(unittest.TestCase):
     for path in paths:
       src_file_name = src_dir_name + path
       dest_file_name = dest_dir_name + path
-      src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-      dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
-      src_bucket = self.client.lookup_bucket(src_bucket_name)
-      dest_bucket = self.client.lookup_bucket(dest_bucket_name)
-      file_size = 1024
       self._insert_random_file(self.client, src_file_name, file_size)
-      self.assertTrue(src_blob_name in src_bucket.blobs)
-      self.assertFalse(dest_blob_name in dest_bucket.blobs)
+      self.assertTrue(
+          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+      self.assertFalse(
+          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.copytree(src_dir_name, dest_dir_name)
 
     for path in paths:
       src_file_name = src_dir_name + path
       dest_file_name = dest_dir_name + path
-      src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-      dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
-      src_bucket = self.client.lookup_bucket(src_bucket_name)
-      dest_bucket = self.client.lookup_bucket(dest_bucket_name)
-      self.assertTrue(src_blob_name in src_bucket.blobs)
-      self.assertTrue(dest_blob_name in dest_bucket.blobs)
+      self.assertTrue(
+          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+      self.assertTrue(
+          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
   def test_rename(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
-    src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
-    dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
     file_size = 1024
     self._insert_random_file(self.client, src_file_name, file_size)
-    src_bucket = self.client.lookup_bucket(src_bucket_name)
-    dest_bucket = self.client.lookup_bucket(dest_bucket_name)
-    self.assertTrue(src_blob_name in src_bucket.blobs)
-    self.assertFalse(dest_blob_name in dest_bucket.blobs)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.rename(src_file_name, dest_file_name)
 
-    self.assertFalse(src_blob_name in src_bucket.blobs)
-    self.assertTrue(dest_blob_name in dest_bucket.blobs)
+    self.assertFalse(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
+
+  def test_full_file_read(self):
+    file_name = 'gs://gcsio-test/full_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    f = self.gcs.open(file_name)
+    self.assertEqual(f.mode, 'r')
+    f.seek(0, os.SEEK_END)
+    self.assertEqual(f.tell(), file_size)
+    self.assertEqual(f.read(), b'')
+    f.seek(0)
+    self.assertEqual(f.read(), random_file.contents)
+
+  def test_file_random_seek(self):
+    file_name = 'gs://gcsio-test/seek_file'
+    file_size = 5 * 1024 * 1024 - 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+
+    f = self.gcs.open(file_name)
+    random.seed(0)
+    for _ in range(0, 10):
+      a = random.randint(0, file_size - 1)
+      b = random.randint(0, file_size - 1)
+      start, end = min(a, b), max(a, b)
+      f.seek(start)
+      self.assertEqual(f.tell(), start)
+      self.assertEqual(
+          f.read(end - start + 1), random_file.contents[start:end + 1])
+      self.assertEqual(f.tell(), end + 1)
+
+  def test_file_iterator(self):
+    file_name = 'gs://gcsio-test/iterating_file'
+    lines = []
+    line_count = 10
+    for _ in range(line_count):
+      line_length = random.randint(100, 500)
+      line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
+      lines.append(line)
+
+    contents = b''.join(lines)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
+
+    f = self.gcs.open(file_name)
+
+    read_lines = 0
+    for line in f:
+      read_lines += 1
 
-  def test_file_buffered_read_call(self):
+    self.assertEqual(read_lines, line_count)
+
+  def test_file_read_line(self):
     file_name = 'gs://gcsio-test/read_line_file'
+    lines = []
+
+    # Set a small buffer size to exercise refilling the buffer.
+    # First line is carefully crafted so the newline falls as the last character
+    # of the buffer to exercise this code path.
     read_buffer_size = 1024
-    self._insert_random_file(self.client, file_name, 10240)
+    lines.append(b'x' * 1023 + b'\n')
+
+    for _ in range(1, 1000):
+      line_length = random.randint(100, 500)
+      line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
+      lines.append(line)
+    contents = b''.join(lines)
+
+    file_size = len(contents)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
+
+    f = self.gcs.open(file_name, read_buffer_size=read_buffer_size)
+
+    # Test read of first two lines.
+    f.seek(0)
+    self.assertEqual(f.readline(), lines[0])
+    self.assertEqual(f.tell(), len(lines[0]))
+    self.assertEqual(f.readline(), lines[1])
+
+    # Test read at line boundary.
+    f.seek(file_size - len(lines[-1]) - 1)
+    self.assertEqual(f.readline(), b'\n')
+
+    # Test read at end of file.
+    f.seek(file_size)
+    self.assertEqual(f.readline(), b'')
+
+    # Test reads at random positions.
+    random.seed(0)
+    for _ in range(0, 10):
+      start = random.randint(0, file_size - 1)
+      line_index = 0
+      # Find line corresponding to start index.
+      chars_left = start
+      while True:
+        next_line_length = len(lines[line_index])
+        if chars_left - next_line_length < 0:
+          break
+        chars_left -= next_line_length
+        line_index += 1
+      f.seek(start)
+      self.assertEqual(f.readline(), lines[line_index][chars_left:])
+
+  def test_file_write(self):
+    file_name = 'gs://gcsio-test/write_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents[0:1000])
+    f.write(contents[1000:1024 * 1024])
+    f.write(contents[1024 * 1024:])
+    f.close()
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
+
+  def test_file_close(self):
+    file_name = 'gs://gcsio-test/close_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents)
+    f.close()
+    f.close()  # This should not crash.
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
+
+  def test_file_flush(self):
+    file_name = 'gs://gcsio-test/flush_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents[0:1000])
+    f.flush()
+    f.write(contents[1000:1024 * 1024])
+    f.flush()
+    f.flush()  # Should be a NOOP.
+    f.write(contents[1024 * 1024:])
+    f.close()  # This should already call the equivalent of flush() in its body.
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
 
-    bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
-    bucket = self.client.get_bucket(bucket_name)
-    blob = bucket.get_blob(blob_name)
+  def test_context_manager(self):
+    # Test writing with a context manager.
+    file_name = 'gs://gcsio-test/context_manager_file'
+    file_size = 1024
+    contents = os.urandom(file_size)
+    with self.gcs.open(file_name, 'w') as f:
+      f.write(contents)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
 
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader:
-      self.gcs.open(file_name, read_buffer_size=read_buffer_size)
-      reader.assert_called_with(blob, chunk_size=read_buffer_size)
+    # Test reading with a context manager.
+    with self.gcs.open(file_name) as f:
+      self.assertEqual(f.read(), contents)
 
-  def test_file_write_call(self):
-    file_name = 'gs://gcsio-test/write_file'
-    with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobWriter') as writer:
-      self.gcs.open(file_name, 'w')
-      writer.assert_called()
+    # Test that exceptions are not swallowed by the context manager.
+    with self.assertRaises(ZeroDivisionError):
+      with self.gcs.open(file_name) as f:
+        f.read(0 // 0)
 
   def test_list_prefix(self):
     bucket_name = 'gcsio-test'
@@ -472,11 +812,141 @@ class TestGCSIO(unittest.TestCase):
           set(self.gcs.list_prefix(file_pattern).items()),
           set(expected_file_names))
 
+  def test_mime_binary_encoding(self):
+    # This test verifies that the MIME email_generator library works properly
+    # and does not corrupt '\r\n' during uploads (the patch to apitools in
+    # Python 3 is applied in io/gcp/__init__.py).
+    from apitools.base.py.transfer import email_generator
+    generator_cls = email_generator.BytesGenerator
+    output_buffer = io.BytesIO()
+    generator = generator_cls(output_buffer)
+    test_msg = 'a\nb\r\nc\n\r\n\n\nd'
+    message = Message()
+    message.set_payload(test_msg)
+    generator._handle_text(message)
+    self.assertEqual(test_msg.encode('ascii'), output_buffer.getvalue())
+
+  def test_downloader_monitoring_info(self):
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    self.gcs.open(file_name, 'r')
+
+    resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(DEFAULT_PROJECT_NUMBER),
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+
+    metric_name = MetricName(
+        None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 2)
+
+  @mock.patch.object(FakeGcsBuckets, 'Get')
+  def test_downloader_fail_to_get_project_number(self, mock_get):
+    # Raising an error when listing GCS Bucket so that project number fails to
+    # be retrieved.
+    mock_get.side_effect = HttpError({'status': 403}, None, None)
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    self.gcs.open(file_name, 'r')
+
+    resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(DEFAULT_PROJECT_NUMBER),
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+
+    metric_name = MetricName(
+        None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 0)
+
+    labels_without_project_id = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.get',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+    metric_name = MetricName(
+        None,
+        None,
+        urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        labels=labels_without_project_id)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 2)
+
   def test_downloader_fail_non_existent_object(self):
     file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
-    with self.assertRaises(NotFound):
+    with self.assertRaises(IOError):
+      self.gcs.open(file_name, 'r')
+
+  def test_downloader_fail_when_getting_metadata(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    self._insert_random_file(
+        self.client, file_name, file_size, fail_when_getting_metadata=True)
+    with self.assertRaises(HttpError):
+      self.gcs.open(file_name, 'r')
+
+  def test_downloader_fail_when_reading(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    self._insert_random_file(
+        self.client, file_name, file_size, fail_when_reading=True)
+    with self.assertRaises(HttpError):
       self.gcs.open(file_name, 'r')
 
+  def test_uploader_monitoring_info(self):
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    f = self.gcs.open(file_name, 'w')
+
+    resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
+    labels = {
+        monitoring_infos.SERVICE_LABEL: 'Storage',
+        monitoring_infos.METHOD_LABEL: 'Objects.insert',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
+        monitoring_infos.GCS_PROJECT_ID_LABEL: str(DEFAULT_PROJECT_NUMBER),
+        monitoring_infos.STATUS_LABEL: 'ok'
+    }
+
+    f.close()
+    metric_name = MetricName(
+        None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+    metric_value = MetricsEnvironment.process_wide_container().get_counter(
+        metric_name).get_cumulative()
+
+    self.assertEqual(metric_value, 1)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
new file mode 100644
index 00000000000..b37a4b57c11
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Common imports for generated storage client library."""
+# pylint:disable=wildcard-import
+
+import pkgutil
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import *
+  from apache_beam.io.gcp.internal.clients.storage.storage_v1_client import *
+  from apache_beam.io.gcp.internal.clients.storage.storage_v1_messages import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
+
+__path__ = pkgutil.extend_path(__path__, __name__)  # type: ignore
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
new file mode 100644
index 00000000000..e5b7c0268ec
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
@@ -0,0 +1,1517 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Generated client library for storage version v1."""
+# NOTE: This file is autogenerated and should not be edited by hand.
+
+from apitools.base.py import base_api
+
+from apache_beam.io.gcp.gcsio_overrides import GcsIOOverrides
+from apache_beam.io.gcp.internal.clients.storage import \
+    storage_v1_messages as messages
+
+
+class StorageV1(base_api.BaseApiClient):
+  """Generated client library for service storage version v1."""
+
+  MESSAGES_MODULE = messages
+  BASE_URL = u'https://www.googleapis.com/storage/v1/'
+
+  _PACKAGE = u'storage'
+  _SCOPES = [
+      u'https://www.googleapis.com/auth/cloud-platform',
+      u'https://www.googleapis.com/auth/cloud-platform.read-only',
+      u'https://www.googleapis.com/auth/devstorage.full_control',
+      u'https://www.googleapis.com/auth/devstorage.read_only',
+      u'https://www.googleapis.com/auth/devstorage.read_write'
+  ]
+  _VERSION = u'v1'
+  _CLIENT_ID = '1042881264118.apps.googleusercontent.com'
+  _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b'
+  _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b'
+  _CLIENT_CLASS_NAME = u'StorageV1'
+  _URL_VERSION = u'v1'
+  _API_KEY = None
+
+  def __init__(
+      self,
+      url='',
+      credentials=None,
+      get_credentials=True,
+      http=None,
+      model=None,
+      log_request=False,
+      log_response=False,
+      credentials_args=None,
+      default_global_params=None,
+      additional_http_headers=None,
+      response_encoding=None):
+    """Create a new storage handle."""
+    url = url or self.BASE_URL
+    super().__init__(
+        url,
+        credentials=credentials,
+        get_credentials=get_credentials,
+        http=http,
+        model=model,
+        log_request=log_request,
+        log_response=log_response,
+        num_retries=20,
+        credentials_args=credentials_args,
+        default_global_params=default_global_params,
+        additional_http_headers=additional_http_headers,
+        retry_func=GcsIOOverrides.retry_func,
+        response_encoding=response_encoding)
+    self.bucketAccessControls = self.BucketAccessControlsService(self)
+    self.buckets = self.BucketsService(self)
+    self.channels = self.ChannelsService(self)
+    self.defaultObjectAccessControls = self.DefaultObjectAccessControlsService(
+        self)
+    self.notifications = self.NotificationsService(self)
+    self.objectAccessControls = self.ObjectAccessControlsService(self)
+    self.objects = self.ObjectsService(self)
+    self.projects_serviceAccount = self.ProjectsServiceAccountService(self)
+    self.projects = self.ProjectsService(self)
+
+  class BucketAccessControlsService(base_api.BaseApiService):
+    """Service class for the bucketAccessControls resource."""
+
+    _NAME = u'bucketAccessControls'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes the ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageBucketAccessControlsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.bucketAccessControls.delete',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageBucketAccessControlsDeleteRequest',
+        response_type_name=u'StorageBucketAccessControlsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns the ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.bucketAccessControls.get',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageBucketAccessControlsGetRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.bucketAccessControls.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl',
+        request_field=u'bucketAccessControl',
+        request_type_name=u'StorageBucketAccessControlsInsertRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves ACL entries on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControls) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.bucketAccessControls.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl',
+        request_field='',
+        request_type_name=u'StorageBucketAccessControlsListRequest',
+        response_type_name=u'BucketAccessControls',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches an ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.bucketAccessControls.patch',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field=u'bucketAccessControl',
+        request_type_name=u'StorageBucketAccessControlsPatchRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates an ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageBucketAccessControlsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (BucketAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.bucketAccessControls.update',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/acl/{entity}',
+        request_field=u'bucketAccessControl',
+        request_type_name=u'StorageBucketAccessControlsUpdateRequest',
+        response_type_name=u'BucketAccessControl',
+        supports_download=False,
+    )
+
+  class BucketsService(base_api.BaseApiService):
+    """Service class for the buckets resource."""
+
+    _NAME = u'buckets'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes an empty bucket.
+
+      Args:
+        request: (StorageBucketsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageBucketsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.buckets.delete',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=
+        [u'ifMetagenerationMatch', u'ifMetagenerationNotMatch', u'userProject'],
+        relative_path=u'b/{bucket}',
+        request_field='',
+        request_type_name=u'StorageBucketsDeleteRequest',
+        response_type_name=u'StorageBucketsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns metadata for the specified bucket.
+
+      Args:
+        request: (StorageBucketsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.get',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}',
+        request_field='',
+        request_type_name=u'StorageBucketsGetRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def GetIamPolicy(self, request, global_params=None):
+      r"""Returns an IAM policy for the specified bucket.
+
+      Args:
+        request: (StorageBucketsGetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('GetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    GetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.getIamPolicy',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/iam',
+        request_field='',
+        request_type_name=u'StorageBucketsGetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new bucket.
+
+      Args:
+        request: (StorageBucketsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.buckets.insert',
+        ordered_params=[u'project'],
+        path_params=[],
+        query_params=[
+            u'predefinedAcl',
+            u'predefinedDefaultObjectAcl',
+            u'project',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b',
+        request_field=u'bucket',
+        request_type_name=u'StorageBucketsInsertRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves a list of buckets for a given project.
+
+      Args:
+        request: (StorageBucketsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Buckets) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.list',
+        ordered_params=[u'project'],
+        path_params=[],
+        query_params=[
+            u'maxResults',
+            u'pageToken',
+            u'prefix',
+            u'project',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b',
+        request_field='',
+        request_type_name=u'StorageBucketsListRequest',
+        response_type_name=u'Buckets',
+        supports_download=False,
+    )
+
+    def LockRetentionPolicy(self, request, global_params=None):
+      r"""Locks retention policy on a bucket.
+
+      Args:
+        request: (StorageBucketsLockRetentionPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('LockRetentionPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    LockRetentionPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.buckets.lockRetentionPolicy',
+        ordered_params=[u'bucket', u'ifMetagenerationMatch'],
+        path_params=[u'bucket'],
+        query_params=[u'ifMetagenerationMatch', u'userProject'],
+        relative_path=u'b/{bucket}/lockRetentionPolicy',
+        request_field='',
+        request_type_name=u'StorageBucketsLockRetentionPolicyRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches a bucket. Changes to the bucket will be readable immediately after writing, but configuration changes may take time to propagate.
+
+      Args:
+        request: (StorageBucketsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.buckets.patch',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'predefinedDefaultObjectAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}',
+        request_field=u'bucketResource',
+        request_type_name=u'StorageBucketsPatchRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+    def SetIamPolicy(self, request, global_params=None):
+      r"""Updates an IAM policy for the specified bucket.
+
+      Args:
+        request: (StorageBucketsSetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('SetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    SetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.buckets.setIamPolicy',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/iam',
+        request_field=u'policy',
+        request_type_name=u'StorageBucketsSetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def TestIamPermissions(self, request, global_params=None):
+      r"""Tests a set of permissions on the given bucket to see which, if any, are held by the caller.
+
+      Args:
+        request: (StorageBucketsTestIamPermissionsRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (TestIamPermissionsResponse) The response message.
+      """
+      config = self.GetMethodConfig('TestIamPermissions')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    TestIamPermissions.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.buckets.testIamPermissions',
+        ordered_params=[u'bucket', u'permissions'],
+        path_params=[u'bucket'],
+        query_params=[u'permissions', u'userProject'],
+        relative_path=u'b/{bucket}/iam/testPermissions',
+        request_field='',
+        request_type_name=u'StorageBucketsTestIamPermissionsRequest',
+        response_type_name=u'TestIamPermissionsResponse',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates a bucket. Changes to the bucket will be readable immediately after writing, but configuration changes may take time to propagate.
+
+      Args:
+        request: (StorageBucketsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Bucket) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.buckets.update',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'predefinedDefaultObjectAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}',
+        request_field=u'bucketResource',
+        request_type_name=u'StorageBucketsUpdateRequest',
+        response_type_name=u'Bucket',
+        supports_download=False,
+    )
+
+  class ChannelsService(base_api.BaseApiService):
+    """Service class for the channels resource."""
+
+    _NAME = u'channels'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Stop(self, request, global_params=None):
+      r"""Stop watching resources through this channel.
+
+      Args:
+        request: (Channel) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageChannelsStopResponse) The response message.
+      """
+      config = self.GetMethodConfig('Stop')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Stop.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.channels.stop',
+        ordered_params=[],
+        path_params=[],
+        query_params=[],
+        relative_path=u'channels/stop',
+        request_field='<request>',
+        request_type_name=u'Channel',
+        response_type_name=u'StorageChannelsStopResponse',
+        supports_download=False,
+    )
+
+  class DefaultObjectAccessControlsService(base_api.BaseApiService):
+    """Service class for the defaultObjectAccessControls resource."""
+
+    _NAME = u'defaultObjectAccessControls'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes the default object ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageDefaultObjectAccessControlsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.defaultObjectAccessControls.delete',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field='',
+        request_type_name=u'StorageDefaultObjectAccessControlsDeleteRequest',
+        response_type_name=u'StorageDefaultObjectAccessControlsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns the default object ACL entry for the specified entity on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.defaultObjectAccessControls.get',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field='',
+        request_type_name=u'StorageDefaultObjectAccessControlsGetRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new default object ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.defaultObjectAccessControls.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageDefaultObjectAccessControlsInsertRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves default object ACL entries on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControls) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.defaultObjectAccessControls.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=
+        [u'ifMetagenerationMatch', u'ifMetagenerationNotMatch', u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl',
+        request_field='',
+        request_type_name=u'StorageDefaultObjectAccessControlsListRequest',
+        response_type_name=u'ObjectAccessControls',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches a default object ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.defaultObjectAccessControls.patch',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageDefaultObjectAccessControlsPatchRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates a default object ACL entry on the specified bucket.
+
+      Args:
+        request: (StorageDefaultObjectAccessControlsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.defaultObjectAccessControls.update',
+        ordered_params=[u'bucket', u'entity'],
+        path_params=[u'bucket', u'entity'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/defaultObjectAcl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageDefaultObjectAccessControlsUpdateRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+  class NotificationsService(base_api.BaseApiService):
+    """Service class for the notifications resource."""
+
+    _NAME = u'notifications'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes a notification subscription.
+
+      Args:
+        request: (StorageNotificationsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageNotificationsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.notifications.delete',
+        ordered_params=[u'bucket', u'notification'],
+        path_params=[u'bucket', u'notification'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs/{notification}',
+        request_field='',
+        request_type_name=u'StorageNotificationsDeleteRequest',
+        response_type_name=u'StorageNotificationsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""View a notification configuration.
+
+      Args:
+        request: (StorageNotificationsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Notification) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.notifications.get',
+        ordered_params=[u'bucket', u'notification'],
+        path_params=[u'bucket', u'notification'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs/{notification}',
+        request_field='',
+        request_type_name=u'StorageNotificationsGetRequest',
+        response_type_name=u'Notification',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a notification subscription for a given bucket.
+
+      Args:
+        request: (StorageNotificationsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Notification) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.notifications.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs',
+        request_field=u'notification',
+        request_type_name=u'StorageNotificationsInsertRequest',
+        response_type_name=u'Notification',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves a list of notification subscriptions for a given bucket.
+
+      Args:
+        request: (StorageNotificationsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Notifications) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.notifications.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[u'userProject'],
+        relative_path=u'b/{bucket}/notificationConfigs',
+        request_field='',
+        request_type_name=u'StorageNotificationsListRequest',
+        response_type_name=u'Notifications',
+        supports_download=False,
+    )
+
+  class ObjectAccessControlsService(base_api.BaseApiService):
+    """Service class for the objectAccessControls resource."""
+
+    _NAME = u'objectAccessControls'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Delete(self, request, global_params=None):
+      r"""Permanently deletes the ACL entry for the specified entity on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageObjectAccessControlsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.objectAccessControls.delete',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageObjectAccessControlsDeleteRequest',
+        response_type_name=u'StorageObjectAccessControlsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      r"""Returns the ACL entry for the specified entity on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objectAccessControls.get',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field='',
+        request_type_name=u'StorageObjectAccessControlsGetRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None):
+      r"""Creates a new ACL entry on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objectAccessControls.insert',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageObjectAccessControlsInsertRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves ACL entries on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControls) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objectAccessControls.list',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl',
+        request_field='',
+        request_type_name=u'StorageObjectAccessControlsListRequest',
+        response_type_name=u'ObjectAccessControls',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches an ACL entry on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.objectAccessControls.patch',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageObjectAccessControlsPatchRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates an ACL entry on the specified object.
+
+      Args:
+        request: (StorageObjectAccessControlsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ObjectAccessControl) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.objectAccessControls.update',
+        ordered_params=[u'bucket', u'object', u'entity'],
+        path_params=[u'bucket', u'entity', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/acl/{entity}',
+        request_field=u'objectAccessControl',
+        request_type_name=u'StorageObjectAccessControlsUpdateRequest',
+        response_type_name=u'ObjectAccessControl',
+        supports_download=False,
+    )
+
+  class ObjectsService(base_api.BaseApiService):
+    """Service class for the objects resource."""
+
+    _NAME = u'objects'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {
+          'Insert': base_api.ApiUploadInfo(
+              accept=['*/*'],
+              max_size=None,
+              resumable_multipart=True,
+              resumable_path=u'/resumable/upload/storage/v1/b/{bucket}/o',
+              simple_multipart=True,
+              simple_path=u'/upload/storage/v1/b/{bucket}/o',
+          ),
+      }
+
+    def Compose(self, request, global_params=None):
+      r"""Concatenates a list of existing objects into a new object in the same bucket.
+
+      Args:
+        request: (StorageObjectsComposeRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Compose')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Compose.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.compose',
+        ordered_params=[u'destinationBucket', u'destinationObject'],
+        path_params=[u'destinationBucket', u'destinationObject'],
+        query_params=[
+            u'destinationPredefinedAcl',
+            u'ifGenerationMatch',
+            u'ifMetagenerationMatch',
+            u'kmsKeyName',
+            u'userProject'
+        ],
+        relative_path=u'b/{destinationBucket}/o/{destinationObject}/compose',
+        request_field=u'composeRequest',
+        request_type_name=u'StorageObjectsComposeRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def Copy(self, request, global_params=None):
+      r"""Copies a source object to a destination object. Optionally overrides metadata.
+
+      Args:
+        request: (StorageObjectsCopyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Copy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Copy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.copy',
+        ordered_params=[
+            u'sourceBucket',
+            u'sourceObject',
+            u'destinationBucket',
+            u'destinationObject'
+        ],
+        path_params=[
+            u'destinationBucket',
+            u'destinationObject',
+            u'sourceBucket',
+            u'sourceObject'
+        ],
+        query_params=[
+            u'destinationPredefinedAcl',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'ifSourceGenerationMatch',
+            u'ifSourceGenerationNotMatch',
+            u'ifSourceMetagenerationMatch',
+            u'ifSourceMetagenerationNotMatch',
+            u'projection',
+            u'sourceGeneration',
+            u'userProject'
+        ],
+        relative_path=
+        u'b/{sourceBucket}/o/{sourceObject}/copyTo/b/{destinationBucket}/o/{destinationObject}',
+        request_field=u'object',
+        request_type_name=u'StorageObjectsCopyRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def Delete(self, request, global_params=None):
+      r"""Deletes an object and its metadata. Deletions are permanent if versioning is not enabled for the bucket, or if the generation parameter is used.
+
+      Args:
+        request: (StorageObjectsDeleteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (StorageObjectsDeleteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Delete')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Delete.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'DELETE',
+        method_id=u'storage.objects.delete',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field='',
+        request_type_name=u'StorageObjectsDeleteRequest',
+        response_type_name=u'StorageObjectsDeleteResponse',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None, download=None):
+      r"""Retrieves an object or its metadata.
+
+      Args:
+        request: (StorageObjectsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+        download: (Download, default: None) If present, download
+            data from the request via this stream.
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(
+          config, request, global_params=global_params, download=download)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.get',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field='',
+        request_type_name=u'StorageObjectsGetRequest',
+        response_type_name=u'Object',
+        supports_download=True,
+    )
+
+    def GetIamPolicy(self, request, global_params=None):
+      r"""Returns an IAM policy for the specified object.
+
+      Args:
+        request: (StorageObjectsGetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('GetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    GetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.getIamPolicy',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/iam',
+        request_field='',
+        request_type_name=u'StorageObjectsGetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def Insert(self, request, global_params=None, upload=None):
+      r"""Stores a new object and metadata.
+
+      Args:
+        request: (StorageObjectsInsertRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+        upload: (Upload, default: None) If present, upload
+            this stream with the request.
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Insert')
+      upload_config = self.GetUploadConfig('Insert')
+      return self._RunMethod(
+          config,
+          request,
+          global_params=global_params,
+          upload=upload,
+          upload_config=upload_config)
+
+    Insert.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.insert',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'contentEncoding',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'kmsKeyName',
+            u'name',
+            u'predefinedAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o',
+        request_field=u'object',
+        request_type_name=u'StorageObjectsInsertRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      r"""Retrieves a list of objects matching the criteria.
+
+      Args:
+        request: (StorageObjectsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Objects) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.list',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'delimiter',
+            u'includeTrailingDelimiter',
+            u'maxResults',
+            u'pageToken',
+            u'prefix',
+            u'projection',
+            u'userProject',
+            u'versions'
+        ],
+        relative_path=u'b/{bucket}/o',
+        request_field='',
+        request_type_name=u'StorageObjectsListRequest',
+        response_type_name=u'Objects',
+        supports_download=False,
+    )
+
+    def Patch(self, request, global_params=None):
+      r"""Patches an object's metadata.
+
+      Args:
+        request: (StorageObjectsPatchRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Patch')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Patch.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PATCH',
+        method_id=u'storage.objects.patch',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field=u'objectResource',
+        request_type_name=u'StorageObjectsPatchRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def Rewrite(self, request, global_params=None):
+      r"""Rewrites a source object to a destination object. Optionally overrides metadata.
+
+      Args:
+        request: (StorageObjectsRewriteRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (RewriteResponse) The response message.
+      """
+      config = self.GetMethodConfig('Rewrite')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Rewrite.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.rewrite',
+        ordered_params=[
+            u'sourceBucket',
+            u'sourceObject',
+            u'destinationBucket',
+            u'destinationObject'
+        ],
+        path_params=[
+            u'destinationBucket',
+            u'destinationObject',
+            u'sourceBucket',
+            u'sourceObject'
+        ],
+        query_params=[
+            u'destinationKmsKeyName',
+            u'destinationPredefinedAcl',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'ifSourceGenerationMatch',
+            u'ifSourceGenerationNotMatch',
+            u'ifSourceMetagenerationMatch',
+            u'ifSourceMetagenerationNotMatch',
+            u'maxBytesRewrittenPerCall',
+            u'projection',
+            u'rewriteToken',
+            u'sourceGeneration',
+            u'userProject'
+        ],
+        relative_path=
+        u'b/{sourceBucket}/o/{sourceObject}/rewriteTo/b/{destinationBucket}/o/{destinationObject}',
+        request_field=u'object',
+        request_type_name=u'StorageObjectsRewriteRequest',
+        response_type_name=u'RewriteResponse',
+        supports_download=False,
+    )
+
+    def SetIamPolicy(self, request, global_params=None):
+      r"""Updates an IAM policy for the specified object.
+
+      Args:
+        request: (StorageObjectsSetIamPolicyRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Policy) The response message.
+      """
+      config = self.GetMethodConfig('SetIamPolicy')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    SetIamPolicy.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.objects.setIamPolicy',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/iam',
+        request_field=u'policy',
+        request_type_name=u'StorageObjectsSetIamPolicyRequest',
+        response_type_name=u'Policy',
+        supports_download=False,
+    )
+
+    def TestIamPermissions(self, request, global_params=None):
+      r"""Tests a set of permissions on the given object to see which, if any, are held by the caller.
+
+      Args:
+        request: (StorageObjectsTestIamPermissionsRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (TestIamPermissionsResponse) The response message.
+      """
+      config = self.GetMethodConfig('TestIamPermissions')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    TestIamPermissions.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.objects.testIamPermissions',
+        ordered_params=[u'bucket', u'object', u'permissions'],
+        path_params=[u'bucket', u'object'],
+        query_params=[u'generation', u'permissions', u'userProject'],
+        relative_path=u'b/{bucket}/o/{object}/iam/testPermissions',
+        request_field='',
+        request_type_name=u'StorageObjectsTestIamPermissionsRequest',
+        response_type_name=u'TestIamPermissionsResponse',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      r"""Updates an object's metadata.
+
+      Args:
+        request: (StorageObjectsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Object) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'storage.objects.update',
+        ordered_params=[u'bucket', u'object'],
+        path_params=[u'bucket', u'object'],
+        query_params=[
+            u'generation',
+            u'ifGenerationMatch',
+            u'ifGenerationNotMatch',
+            u'ifMetagenerationMatch',
+            u'ifMetagenerationNotMatch',
+            u'predefinedAcl',
+            u'projection',
+            u'userProject'
+        ],
+        relative_path=u'b/{bucket}/o/{object}',
+        request_field=u'objectResource',
+        request_type_name=u'StorageObjectsUpdateRequest',
+        response_type_name=u'Object',
+        supports_download=False,
+    )
+
+    def WatchAll(self, request, global_params=None):
+      r"""Watch for changes on all objects in a bucket.
+
+      Args:
+        request: (StorageObjectsWatchAllRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (Channel) The response message.
+      """
+      config = self.GetMethodConfig('WatchAll')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    WatchAll.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'storage.objects.watchAll',
+        ordered_params=[u'bucket'],
+        path_params=[u'bucket'],
+        query_params=[
+            u'delimiter',
+            u'includeTrailingDelimiter',
+            u'maxResults',
+            u'pageToken',
+            u'prefix',
+            u'projection',
+            u'userProject',
+            u'versions'
+        ],
+        relative_path=u'b/{bucket}/o/watch',
+        request_field=u'channel',
+        request_type_name=u'StorageObjectsWatchAllRequest',
+        response_type_name=u'Channel',
+        supports_download=False,
+    )
+
+  class ProjectsServiceAccountService(base_api.BaseApiService):
+    """Service class for the projects_serviceAccount resource."""
+
+    _NAME = u'projects_serviceAccount'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
+
+    def Get(self, request, global_params=None):
+      r"""Get the email address of this project's Google Cloud Storage service account.
+
+      Args:
+        request: (StorageProjectsServiceAccountGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global arguments
+      Returns:
+        (ServiceAccount) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'storage.projects.serviceAccount.get',
+        ordered_params=[u'projectId'],
+        path_params=[u'projectId'],
+        query_params=[u'userProject'],
+        relative_path=u'projects/{projectId}/serviceAccount',
+        request_field='',
+        request_type_name=u'StorageProjectsServiceAccountGetRequest',
+        response_type_name=u'ServiceAccount',
+        supports_download=False,
+    )
+
+  class ProjectsService(base_api.BaseApiService):
+    """Service class for the projects resource."""
+
+    _NAME = u'projects'
+
+    def __init__(self, client):
+      super().__init__(client)
+      self._upload_configs = {}
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
new file mode 100644
index 00000000000..caef0eb4b03
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
@@ -0,0 +1,2714 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Generated message classes for storage version v1.
+
+Stores and retrieves potentially large, immutable data objects.
+"""
+# NOTE: This file is autogenerated and should not be edited by hand.
+
+from apitools.base.protorpclite import message_types as _message_types
+from apitools.base.protorpclite import messages as _messages
+from apitools.base.py import encoding, extra_types
+
+package = 'storage'
+
+
+class Bucket(_messages.Message):
+  r"""A bucket.
+
+  Messages:
+    BillingValue: The bucket's billing configuration.
+    CorsValueListEntry: A CorsValueListEntry object.
+    EncryptionValue: Encryption configuration for a bucket.
+    LabelsValue: User-provided labels, in key/value pairs.
+    LifecycleValue: The bucket's lifecycle configuration. See lifecycle
+      management for more information.
+    LoggingValue: The bucket's logging configuration, which defines the
+      destination bucket and optional name prefix for the current bucket's
+      logs.
+    OwnerValue: The owner of the bucket. This is always the project team's
+      owner group.
+    RetentionPolicyValue: The bucket's retention policy. The retention policy
+      enforces a minimum retention time for all objects contained in the
+      bucket, based on their creation time. Any attempt to overwrite or delete
+      objects younger than the retention period will result in a
+      PERMISSION_DENIED error. An unlocked retention policy can be modified or
+      removed from the bucket via a storage.buckets.update operation. A locked
+      retention policy cannot be removed or shortened in duration for the
+      lifetime of the bucket. Attempting to remove or decrease period of a
+      locked retention policy will result in a PERMISSION_DENIED error.
+    VersioningValue: The bucket's versioning configuration.
+    WebsiteValue: The bucket's website configuration, controlling how the
+      service behaves when accessing bucket contents as a web site. See the
+      Static Website Examples for more information.
+
+  Fields:
+    acl: Access controls on the bucket.
+    billing: The bucket's billing configuration.
+    cors: The bucket's Cross-Origin Resource Sharing (CORS) configuration.
+    defaultEventBasedHold: The default value for event-based hold on newly
+      created objects in this bucket. Event-based hold is a way to retain
+      objects indefinitely until an event occurs, signified by the hold's
+      release. After being released, such objects will be subject to bucket-
+      level retention (if any). One sample use case of this flag is for banks
+      to hold loan documents for at least 3 years after loan is paid in full.
+      Here, bucket-level retention is 3 years and the event is loan being paid
+      in full. In this example, these objects will be held intact for any
+      number of years until the event has occurred (event-based hold on the
+      object is released) and then 3 more years after that. That means
+      retention duration of the objects begins from the moment event-based
+      hold transitioned from true to false. Objects under event-based hold
+      cannot be deleted, overwritten or archived until the hold is removed.
+    defaultObjectAcl: Default access controls to apply to new objects when no
+      ACL is provided.
+    encryption: Encryption configuration for a bucket.
+    etag: HTTP 1.1 Entity tag for the bucket.
+    id: The ID of the bucket. For buckets, the id and name properties are the
+      same.
+    kind: The kind of item this is. For buckets, this is always
+      storage#bucket.
+    labels: User-provided labels, in key/value pairs.
+    lifecycle: The bucket's lifecycle configuration. See lifecycle management
+      for more information.
+    location: The location of the bucket. Object data for objects in the
+      bucket resides in physical storage within this region. Defaults to US.
+      See the developer's guide for the authoritative list.
+    logging: The bucket's logging configuration, which defines the destination
+      bucket and optional name prefix for the current bucket's logs.
+    metageneration: The metadata generation of this bucket.
+    name: The name of the bucket.
+    owner: The owner of the bucket. This is always the project team's owner
+      group.
+    projectNumber: The project number of the project the bucket belongs to.
+    retentionPolicy: The bucket's retention policy. The retention policy
+      enforces a minimum retention time for all objects contained in the
+      bucket, based on their creation time. Any attempt to overwrite or delete
+      objects younger than the retention period will result in a
+      PERMISSION_DENIED error. An unlocked retention policy can be modified or
+      removed from the bucket via a storage.buckets.update operation. A locked
+      retention policy cannot be removed or shortened in duration for the
+      lifetime of the bucket. Attempting to remove or decrease period of a
+      locked retention policy will result in a PERMISSION_DENIED error.
+    selfLink: The URI of this bucket.
+    storageClass: The bucket's default storage class, used whenever no
+      storageClass is specified for a newly-created object. This defines how
+      objects in the bucket are stored and determines the SLA and the cost of
+      storage. Values include MULTI_REGIONAL, REGIONAL, STANDARD, NEARLINE,
+      COLDLINE, and DURABLE_REDUCED_AVAILABILITY. If this value is not
+      specified when the bucket is created, it will default to STANDARD. For
+      more information, see storage classes.
+    timeCreated: The creation time of the bucket in RFC 3339 format.
+    updated: The modification time of the bucket in RFC 3339 format.
+    versioning: The bucket's versioning configuration.
+    website: The bucket's website configuration, controlling how the service
+      behaves when accessing bucket contents as a web site. See the Static
+      Website Examples for more information.
+  """
+  class BillingValue(_messages.Message):
+    r"""The bucket's billing configuration.
+
+    Fields:
+      requesterPays: When set to true, Requester Pays is enabled for this
+        bucket.
+    """
+
+    requesterPays = _messages.BooleanField(1)
+
+  class CorsValueListEntry(_messages.Message):
+    r"""A CorsValueListEntry object.
+
+    Fields:
+      maxAgeSeconds: The value, in seconds, to return in the  Access-Control-
+        Max-Age header used in preflight responses.
+      method: The list of HTTP methods on which to include CORS response
+        headers, (GET, OPTIONS, POST, etc) Note: "*" is permitted in the list
+        of methods, and means "any method".
+      origin: The list of Origins eligible to receive CORS response headers.
+        Note: "*" is permitted in the list of origins, and means "any Origin".
+      responseHeader: The list of HTTP headers other than the simple response
+        headers to give permission for the user-agent to share across domains.
+    """
+
+    maxAgeSeconds = _messages.IntegerField(1, variant=_messages.Variant.INT32)
+    method = _messages.StringField(2, repeated=True)
+    origin = _messages.StringField(3, repeated=True)
+    responseHeader = _messages.StringField(4, repeated=True)
+
+  class EncryptionValue(_messages.Message):
+    r"""Encryption configuration for a bucket.
+
+    Fields:
+      defaultKmsKeyName: A Cloud KMS key that will be used to encrypt objects
+        inserted into this bucket, if no encryption method is specified.
+    """
+
+    defaultKmsKeyName = _messages.StringField(1)
+
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class LabelsValue(_messages.Message):
+    r"""User-provided labels, in key/value pairs.
+
+    Messages:
+      AdditionalProperty: An additional property for a LabelsValue object.
+
+    Fields:
+      additionalProperties: An individual label entry.
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a LabelsValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  class LifecycleValue(_messages.Message):
+    r"""The bucket's lifecycle configuration. See lifecycle management for
+    more information.
+
+    Messages:
+      RuleValueListEntry: A RuleValueListEntry object.
+
+    Fields:
+      rule: A lifecycle management rule, which is made of an action to take
+        and the condition(s) under which the action will be taken.
+    """
+    class RuleValueListEntry(_messages.Message):
+      r"""A RuleValueListEntry object.
+
+      Messages:
+        ActionValue: The action to take.
+        ConditionValue: The condition(s) under which the action will be taken.
+
+      Fields:
+        action: The action to take.
+        condition: The condition(s) under which the action will be taken.
+      """
+      class ActionValue(_messages.Message):
+        r"""The action to take.
+
+        Fields:
+          storageClass: Target storage class. Required iff the type of the
+            action is SetStorageClass.
+          type: Type of the action. Currently, only Delete and SetStorageClass
+            are supported.
+        """
+
+        storageClass = _messages.StringField(1)
+        type = _messages.StringField(2)
+
+      class ConditionValue(_messages.Message):
+        r"""The condition(s) under which the action will be taken.
+
+        Fields:
+          age: Age of an object (in days). This condition is satisfied when an
+            object reaches the specified age.
+          createdBefore: A date in RFC 3339 format with only the date part
+            (for instance, "2013-01-15"). This condition is satisfied when an
+            object is created before midnight of the specified date in UTC.
+          isLive: Relevant only for versioned objects. If the value is true,
+            this condition matches live objects; if the value is false, it
+            matches archived objects.
+          matchesPattern: A regular expression that satisfies the RE2 syntax.
+            This condition is satisfied when the name of the object matches
+            the RE2 pattern. Note: This feature is currently in the "Early
+            Access" launch stage and is only available to a whitelisted set of
+            users; that means that this feature may be changed in backward-
+            incompatible ways and that it is not guaranteed to be released.
+          matchesStorageClass: Objects having any of the storage classes
+            specified by this condition will be matched. Values include
+            MULTI_REGIONAL, REGIONAL, NEARLINE, COLDLINE, STANDARD, and
+            DURABLE_REDUCED_AVAILABILITY.
+          numNewerVersions: Relevant only for versioned objects. If the value
+            is N, this condition is satisfied when there are at least N
+            versions (including the live version) newer than this version of
+            the object.
+        """
+
+        age = _messages.IntegerField(1, variant=_messages.Variant.INT32)
+        createdBefore = extra_types.DateField(2)
+        isLive = _messages.BooleanField(3)
+        matchesPattern = _messages.StringField(4)
+        matchesStorageClass = _messages.StringField(5, repeated=True)
+        numNewerVersions = _messages.IntegerField(
+            6, variant=_messages.Variant.INT32)
+
+      action = _messages.MessageField('ActionValue', 1)
+      condition = _messages.MessageField('ConditionValue', 2)
+
+    rule = _messages.MessageField('RuleValueListEntry', 1, repeated=True)
+
+  class LoggingValue(_messages.Message):
+    r"""The bucket's logging configuration, which defines the destination
+    bucket and optional name prefix for the current bucket's logs.
+
+    Fields:
+      logBucket: The destination bucket where the current bucket's logs should
+        be placed.
+      logObjectPrefix: A prefix for log object names.
+    """
+
+    logBucket = _messages.StringField(1)
+    logObjectPrefix = _messages.StringField(2)
+
+  class OwnerValue(_messages.Message):
+    r"""The owner of the bucket. This is always the project team's owner
+    group.
+
+    Fields:
+      entity: The entity, in the form project-owner-projectId.
+      entityId: The ID for the entity.
+    """
+
+    entity = _messages.StringField(1)
+    entityId = _messages.StringField(2)
+
+  class RetentionPolicyValue(_messages.Message):
+    r"""The bucket's retention policy. The retention policy enforces a minimum
+    retention time for all objects contained in the bucket, based on their
+    creation time. Any attempt to overwrite or delete objects younger than the
+    retention period will result in a PERMISSION_DENIED error. An unlocked
+    retention policy can be modified or removed from the bucket via a
+    storage.buckets.update operation. A locked retention policy cannot be
+    removed or shortened in duration for the lifetime of the bucket.
+    Attempting to remove or decrease period of a locked retention policy will
+    result in a PERMISSION_DENIED error.
+
+    Fields:
+      effectiveTime: Server-determined value that indicates the time from
+        which policy was enforced and effective. This value is in RFC 3339
+        format.
+      isLocked: Once locked, an object retention policy cannot be modified.
+      retentionPeriod: The duration in seconds that objects need to be
+        retained. Retention duration must be greater than zero and less than
+        100 years. Note that enforcement of retention periods less than a day
+        is not guaranteed. Such periods should only be used for testing
+        purposes.
+    """
+
+    effectiveTime = _message_types.DateTimeField(1)
+    isLocked = _messages.BooleanField(2)
+    retentionPeriod = _messages.IntegerField(3)
+
+  class VersioningValue(_messages.Message):
+    r"""The bucket's versioning configuration.
+
+    Fields:
+      enabled: While set to true, versioning is fully enabled for this bucket.
+    """
+
+    enabled = _messages.BooleanField(1)
+
+  class WebsiteValue(_messages.Message):
+    r"""The bucket's website configuration, controlling how the service
+    behaves when accessing bucket contents as a web site. See the Static
+    Website Examples for more information.
+
+    Fields:
+      mainPageSuffix: If the requested object path is missing, the service
+        will ensure the path has a trailing '/', append this suffix, and
+        attempt to retrieve the resulting object. This allows the creation of
+        index.html objects to represent directory pages.
+      notFoundPage: If the requested object path is missing, and any
+        mainPageSuffix object is missing, if applicable, the service will
+        return the named object from this bucket as the content for a 404 Not
+        Found result.
+    """
+
+    mainPageSuffix = _messages.StringField(1)
+    notFoundPage = _messages.StringField(2)
+
+  acl = _messages.MessageField('BucketAccessControl', 1, repeated=True)
+  billing = _messages.MessageField('BillingValue', 2)
+  cors = _messages.MessageField('CorsValueListEntry', 3, repeated=True)
+  defaultEventBasedHold = _messages.BooleanField(4)
+  defaultObjectAcl = _messages.MessageField(
+      'ObjectAccessControl', 5, repeated=True)
+  encryption = _messages.MessageField('EncryptionValue', 6)
+  etag = _messages.StringField(7)
+  id = _messages.StringField(8)
+  kind = _messages.StringField(9, default=u'storage#bucket')
+  labels = _messages.MessageField('LabelsValue', 10)
+  lifecycle = _messages.MessageField('LifecycleValue', 11)
+  location = _messages.StringField(12)
+  logging = _messages.MessageField('LoggingValue', 13)
+  metageneration = _messages.IntegerField(14)
+  name = _messages.StringField(15)
+  owner = _messages.MessageField('OwnerValue', 16)
+  projectNumber = _messages.IntegerField(17, variant=_messages.Variant.UINT64)
+  retentionPolicy = _messages.MessageField('RetentionPolicyValue', 18)
+  selfLink = _messages.StringField(19)
+  storageClass = _messages.StringField(20)
+  timeCreated = _message_types.DateTimeField(21)
+  updated = _message_types.DateTimeField(22)
+  versioning = _messages.MessageField('VersioningValue', 23)
+  website = _messages.MessageField('WebsiteValue', 24)
+
+
+class BucketAccessControl(_messages.Message):
+  r"""An access-control entry.
+
+  Messages:
+    ProjectTeamValue: The project team associated with the entity, if any.
+
+  Fields:
+    bucket: The name of the bucket.
+    domain: The domain associated with the entity, if any.
+    email: The email address associated with the entity, if any.
+    entity: The entity holding the permission, in one of the following forms:
+      - user-userId  - user-email  - group-groupId  - group-email  - domain-
+      domain  - project-team-projectId  - allUsers  - allAuthenticatedUsers
+      Examples:  - The user liz@example.com would be user-liz@example.com.  -
+      The group example@googlegroups.com would be group-
+      example@googlegroups.com.  - To refer to all members of the Google Apps
+      for Business domain example.com, the entity would be domain-example.com.
+    entityId: The ID for the entity, if any.
+    etag: HTTP 1.1 Entity tag for the access-control entry.
+    id: The ID of the access-control entry.
+    kind: The kind of item this is. For bucket access control entries, this is
+      always storage#bucketAccessControl.
+    projectTeam: The project team associated with the entity, if any.
+    role: The access permission for the entity.
+    selfLink: The link to this access-control entry.
+  """
+  class ProjectTeamValue(_messages.Message):
+    r"""The project team associated with the entity, if any.
+
+    Fields:
+      projectNumber: The project number.
+      team: The team.
+    """
+
+    projectNumber = _messages.StringField(1)
+    team = _messages.StringField(2)
+
+  bucket = _messages.StringField(1)
+  domain = _messages.StringField(2)
+  email = _messages.StringField(3)
+  entity = _messages.StringField(4)
+  entityId = _messages.StringField(5)
+  etag = _messages.StringField(6)
+  id = _messages.StringField(7)
+  kind = _messages.StringField(8, default=u'storage#bucketAccessControl')
+  projectTeam = _messages.MessageField('ProjectTeamValue', 9)
+  role = _messages.StringField(10)
+  selfLink = _messages.StringField(11)
+
+
+class BucketAccessControls(_messages.Message):
+  r"""An access-control list.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of bucket access control
+      entries, this is always storage#bucketAccessControls.
+  """
+
+  items = _messages.MessageField('BucketAccessControl', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#bucketAccessControls')
+
+
+class Buckets(_messages.Message):
+  r"""A list of buckets.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of buckets, this is always
+      storage#buckets.
+    nextPageToken: The continuation token, used to page through large result
+      sets. Provide this value in a subsequent request to return the next page
+      of results.
+  """
+
+  items = _messages.MessageField('Bucket', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#buckets')
+  nextPageToken = _messages.StringField(3)
+
+
+class Channel(_messages.Message):
+  r"""An notification channel used to watch for resource changes.
+
+  Messages:
+    ParamsValue: Additional parameters controlling delivery channel behavior.
+      Optional.
+
+  Fields:
+    address: The address where notifications are delivered for this channel.
+    expiration: Date and time of notification channel expiration, expressed as
+      a Unix timestamp, in milliseconds. Optional.
+    id: A UUID or similar unique string that identifies this channel.
+    kind: Identifies this as a notification channel used to watch for changes
+      to a resource. Value: the fixed string "api#channel".
+    params: Additional parameters controlling delivery channel behavior.
+      Optional.
+    payload: A Boolean value to indicate whether payload is wanted. Optional.
+    resourceId: An opaque ID that identifies the resource being watched on
+      this channel. Stable across different API versions.
+    resourceUri: A version-specific identifier for the watched resource.
+    token: An arbitrary string delivered to the target address with each
+      notification delivered over this channel. Optional.
+    type: The type of delivery mechanism used for this channel.
+  """
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class ParamsValue(_messages.Message):
+    r"""Additional parameters controlling delivery channel behavior. Optional.
+
+    Messages:
+      AdditionalProperty: An additional property for a ParamsValue object.
+
+    Fields:
+      additionalProperties: Declares a new parameter by name.
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a ParamsValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  address = _messages.StringField(1)
+  expiration = _messages.IntegerField(2)
+  id = _messages.StringField(3)
+  kind = _messages.StringField(4, default=u'api#channel')
+  params = _messages.MessageField('ParamsValue', 5)
+  payload = _messages.BooleanField(6)
+  resourceId = _messages.StringField(7)
+  resourceUri = _messages.StringField(8)
+  token = _messages.StringField(9)
+  type = _messages.StringField(10)
+
+
+class ComposeRequest(_messages.Message):
+  r"""A Compose request.
+
+  Messages:
+    SourceObjectsValueListEntry: A SourceObjectsValueListEntry object.
+
+  Fields:
+    destination: Properties of the resulting object.
+    kind: The kind of item this is.
+    sourceObjects: The list of source objects that will be concatenated into a
+      single object.
+  """
+  class SourceObjectsValueListEntry(_messages.Message):
+    r"""A SourceObjectsValueListEntry object.
+
+    Messages:
+      ObjectPreconditionsValue: Conditions that must be met for this operation
+        to execute.
+
+    Fields:
+      generation: The generation of this object to use as the source.
+      name: The source object's name. All source objects must reside in the
+        same bucket.
+      objectPreconditions: Conditions that must be met for this operation to
+        execute.
+    """
+    class ObjectPreconditionsValue(_messages.Message):
+      r"""Conditions that must be met for this operation to execute.
+
+      Fields:
+        ifGenerationMatch: Only perform the composition if the generation of
+          the source object that would be used matches this value. If this
+          value and a generation are both specified, they must be the same
+          value or the call will fail.
+      """
+
+      ifGenerationMatch = _messages.IntegerField(1)
+
+    generation = _messages.IntegerField(1)
+    name = _messages.StringField(2)
+    objectPreconditions = _messages.MessageField('ObjectPreconditionsValue', 3)
+
+  destination = _messages.MessageField('Object', 1)
+  kind = _messages.StringField(2, default=u'storage#composeRequest')
+  sourceObjects = _messages.MessageField(
+      'SourceObjectsValueListEntry', 3, repeated=True)
+
+
+class Notification(_messages.Message):
+  r"""A subscription to receive Google PubSub notifications.
+
+  Messages:
+    CustomAttributesValue: An optional list of additional attributes to attach
+      to each Cloud PubSub message published for this notification
+      subscription.
+
+  Fields:
+    custom_attributes: An optional list of additional attributes to attach to
+      each Cloud PubSub message published for this notification subscription.
+    etag: HTTP 1.1 Entity tag for this subscription notification.
+    event_types: If present, only send notifications about listed event types.
+      If empty, sent notifications for all event types.
+    id: The ID of the notification.
+    kind: The kind of item this is. For notifications, this is always
+      storage#notification.
+    object_name_prefix: If present, only apply this notification configuration
+      to object names that begin with this prefix.
+    payload_format: The desired content of the Payload.
+    selfLink: The canonical URL of this notification.
+    topic: The Cloud PubSub topic to which this subscription publishes.
+      Formatted as: '//pubsub.googleapis.com/projects/{project-
+      identifier}/topics/{my-topic}'
+  """
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class CustomAttributesValue(_messages.Message):
+    r"""An optional list of additional attributes to attach to each Cloud
+    PubSub message published for this notification subscription.
+
+    Messages:
+      AdditionalProperty: An additional property for a CustomAttributesValue
+        object.
+
+    Fields:
+      additionalProperties: Additional properties of type
+        CustomAttributesValue
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a CustomAttributesValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  custom_attributes = _messages.MessageField('CustomAttributesValue', 1)
+  etag = _messages.StringField(2)
+  event_types = _messages.StringField(3, repeated=True)
+  id = _messages.StringField(4)
+  kind = _messages.StringField(5, default=u'storage#notification')
+  object_name_prefix = _messages.StringField(6)
+  payload_format = _messages.StringField(7, default=u'JSON_API_V1')
+  selfLink = _messages.StringField(8)
+  topic = _messages.StringField(9)
+
+
+class Notifications(_messages.Message):
+  r"""A list of notification subscriptions.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of notifications, this is always
+      storage#notifications.
+  """
+
+  items = _messages.MessageField('Notification', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#notifications')
+
+
+class Object(_messages.Message):
+  r"""An object.
+
+  Messages:
+    CustomerEncryptionValue: Metadata of customer-supplied encryption key, if
+      the object is encrypted by such a key.
+    MetadataValue: User-provided metadata, in key/value pairs.
+    OwnerValue: The owner of the object. This will always be the uploader of
+      the object.
+
+  Fields:
+    acl: Access controls on the object.
+    bucket: The name of the bucket containing this object.
+    cacheControl: Cache-Control directive for the object data. If omitted, and
+      the object is accessible to all anonymous users, the default will be
+      public, max-age=3600.
+    componentCount: Number of underlying components that make up this object.
+      Components are accumulated by compose operations.
+    contentDisposition: Content-Disposition of the object data.
+    contentEncoding: Content-Encoding of the object data.
+    contentLanguage: Content-Language of the object data.
+    contentType: Content-Type of the object data. If an object is stored
+      without a Content-Type, it is served as application/octet-stream.
+    crc32c: CRC32c checksum, as described in RFC 4960, Appendix B; encoded
+      using base64 in big-endian byte order. For more information about using
+      the CRC32c checksum, see Hashes and ETags: Best Practices.
+    customerEncryption: Metadata of customer-supplied encryption key, if the
+      object is encrypted by such a key.
+    etag: HTTP 1.1 Entity tag for the object.
+    eventBasedHold: Whether an object is under event-based hold. Event-based
+      hold is a way to retain objects until an event occurs, which is
+      signified by the hold's release (i.e. this value is set to false). After
+      being released (set to false), such objects will be subject to bucket-
+      level retention (if any). One sample use case of this flag is for banks
+      to hold loan documents for at least 3 years after loan is paid in full.
+      Here, bucket-level retention is 3 years and the event is the loan being
+      paid in full. In this example, these objects will be held intact for any
+      number of years until the event has occurred (event-based hold on the
+      object is released) and then 3 more years after that. That means
+      retention duration of the objects begins from the moment event-based
+      hold transitioned from true to false.
+    generation: The content generation of this object. Used for object
+      versioning.
+    id: The ID of the object, including the bucket name, object name, and
+      generation number.
+    kind: The kind of item this is. For objects, this is always
+      storage#object.
+    kmsKeyName: Cloud KMS Key used to encrypt this object, if the object is
+      encrypted by such a key.
+    md5Hash: MD5 hash of the data; encoded using base64. For more information
+      about using the MD5 hash, see Hashes and ETags: Best Practices.
+    mediaLink: Media download link.
+    metadata: User-provided metadata, in key/value pairs.
+    metageneration: The version of the metadata for this object at this
+      generation. Used for preconditions and for detecting changes in
+      metadata. A metageneration number is only meaningful in the context of a
+      particular generation of a particular object.
+    name: The name of the object. Required if not specified by URL parameter.
+    owner: The owner of the object. This will always be the uploader of the
+      object.
+    retentionExpirationTime: A server-determined value that specifies the
+      earliest time that the object's retention period expires. This value is
+      in RFC 3339 format. Note 1: This field is not provided for objects with
+      an active event-based hold, since retention expiration is unknown until
+      the hold is removed. Note 2: This value can be provided even when
+      temporary hold is set (so that the user can reason about policy without
+      having to first unset the temporary hold).
+    selfLink: The link to this object.
+    size: Content-Length of the data in bytes.
+    storageClass: Storage class of the object.
+    temporaryHold: Whether an object is under temporary hold. While this flag
+      is set to true, the object is protected against deletion and overwrites.
+      A common use case of this flag is regulatory investigations where
+      objects need to be retained while the investigation is ongoing. Note
+      that unlike event-based hold, temporary hold does not impact retention
+      expiration time of an object.
+    timeCreated: The creation time of the object in RFC 3339 format.
+    timeDeleted: The deletion time of the object in RFC 3339 format. Will be
+      returned if and only if this version of the object has been deleted.
+    timeStorageClassUpdated: The time at which the object's storage class was
+      last changed. When the object is initially created, it will be set to
+      timeCreated.
+    updated: The modification time of the object metadata in RFC 3339 format.
+  """
+  class CustomerEncryptionValue(_messages.Message):
+    r"""Metadata of customer-supplied encryption key, if the object is
+    encrypted by such a key.
+
+    Fields:
+      encryptionAlgorithm: The encryption algorithm.
+      keySha256: SHA256 hash value of the encryption key.
+    """
+
+    encryptionAlgorithm = _messages.StringField(1)
+    keySha256 = _messages.StringField(2)
+
+  @encoding.MapUnrecognizedFields('additionalProperties')
+  class MetadataValue(_messages.Message):
+    r"""User-provided metadata, in key/value pairs.
+
+    Messages:
+      AdditionalProperty: An additional property for a MetadataValue object.
+
+    Fields:
+      additionalProperties: An individual metadata entry.
+    """
+    class AdditionalProperty(_messages.Message):
+      r"""An additional property for a MetadataValue object.
+
+      Fields:
+        key: Name of the additional property.
+        value: A string attribute.
+      """
+
+      key = _messages.StringField(1)
+      value = _messages.StringField(2)
+
+    additionalProperties = _messages.MessageField(
+        'AdditionalProperty', 1, repeated=True)
+
+  class OwnerValue(_messages.Message):
+    r"""The owner of the object. This will always be the uploader of the
+    object.
+
+    Fields:
+      entity: The entity, in the form user-userId.
+      entityId: The ID for the entity.
+    """
+
+    entity = _messages.StringField(1)
+    entityId = _messages.StringField(2)
+
+  acl = _messages.MessageField('ObjectAccessControl', 1, repeated=True)
+  bucket = _messages.StringField(2)
+  cacheControl = _messages.StringField(3)
+  componentCount = _messages.IntegerField(4, variant=_messages.Variant.INT32)
+  contentDisposition = _messages.StringField(5)
+  contentEncoding = _messages.StringField(6)
+  contentLanguage = _messages.StringField(7)
+  contentType = _messages.StringField(8)
+  crc32c = _messages.StringField(9)
+  customerEncryption = _messages.MessageField('CustomerEncryptionValue', 10)
+  etag = _messages.StringField(11)
+  eventBasedHold = _messages.BooleanField(12)
+  generation = _messages.IntegerField(13)
+  id = _messages.StringField(14)
+  kind = _messages.StringField(15, default=u'storage#object')
+  kmsKeyName = _messages.StringField(16)
+  md5Hash = _messages.StringField(17)
+  mediaLink = _messages.StringField(18)
+  metadata = _messages.MessageField('MetadataValue', 19)
+  metageneration = _messages.IntegerField(20)
+  name = _messages.StringField(21)
+  owner = _messages.MessageField('OwnerValue', 22)
+  retentionExpirationTime = _message_types.DateTimeField(23)
+  selfLink = _messages.StringField(24)
+  size = _messages.IntegerField(25, variant=_messages.Variant.UINT64)
+  storageClass = _messages.StringField(26)
+  temporaryHold = _messages.BooleanField(27)
+  timeCreated = _message_types.DateTimeField(28)
+  timeDeleted = _message_types.DateTimeField(29)
+  timeStorageClassUpdated = _message_types.DateTimeField(30)
+  updated = _message_types.DateTimeField(31)
+
+
+class ObjectAccessControl(_messages.Message):
+  r"""An access-control entry.
+
+  Messages:
+    ProjectTeamValue: The project team associated with the entity, if any.
+
+  Fields:
+    bucket: The name of the bucket.
+    domain: The domain associated with the entity, if any.
+    email: The email address associated with the entity, if any.
+    entity: The entity holding the permission, in one of the following forms:
+      - user-userId  - user-email  - group-groupId  - group-email  - domain-
+      domain  - project-team-projectId  - allUsers  - allAuthenticatedUsers
+      Examples:  - The user liz@example.com would be user-liz@example.com.  -
+      The group example@googlegroups.com would be group-
+      example@googlegroups.com.  - To refer to all members of the Google Apps
+      for Business domain example.com, the entity would be domain-example.com.
+    entityId: The ID for the entity, if any.
+    etag: HTTP 1.1 Entity tag for the access-control entry.
+    generation: The content generation of the object, if applied to an object.
+    id: The ID of the access-control entry.
+    kind: The kind of item this is. For object access control entries, this is
+      always storage#objectAccessControl.
+    object: The name of the object, if applied to an object.
+    projectTeam: The project team associated with the entity, if any.
+    role: The access permission for the entity.
+    selfLink: The link to this access-control entry.
+  """
+  class ProjectTeamValue(_messages.Message):
+    r"""The project team associated with the entity, if any.
+
+    Fields:
+      projectNumber: The project number.
+      team: The team.
+    """
+
+    projectNumber = _messages.StringField(1)
+    team = _messages.StringField(2)
+
+  bucket = _messages.StringField(1)
+  domain = _messages.StringField(2)
+  email = _messages.StringField(3)
+  entity = _messages.StringField(4)
+  entityId = _messages.StringField(5)
+  etag = _messages.StringField(6)
+  generation = _messages.IntegerField(7)
+  id = _messages.StringField(8)
+  kind = _messages.StringField(9, default=u'storage#objectAccessControl')
+  object = _messages.StringField(10)
+  projectTeam = _messages.MessageField('ProjectTeamValue', 11)
+  role = _messages.StringField(12)
+  selfLink = _messages.StringField(13)
+
+
+class ObjectAccessControls(_messages.Message):
+  r"""An access-control list.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of object access control
+      entries, this is always storage#objectAccessControls.
+  """
+
+  items = _messages.MessageField('ObjectAccessControl', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#objectAccessControls')
+
+
+class Objects(_messages.Message):
+  r"""A list of objects.
+
+  Fields:
+    items: The list of items.
+    kind: The kind of item this is. For lists of objects, this is always
+      storage#objects.
+    nextPageToken: The continuation token, used to page through large result
+      sets. Provide this value in a subsequent request to return the next page
+      of results.
+    prefixes: The list of prefixes of objects matching-but-not-listed up to
+      and including the requested delimiter.
+  """
+
+  items = _messages.MessageField('Object', 1, repeated=True)
+  kind = _messages.StringField(2, default=u'storage#objects')
+  nextPageToken = _messages.StringField(3)
+  prefixes = _messages.StringField(4, repeated=True)
+
+
+class Policy(_messages.Message):
+  r"""A bucket/object IAM policy.
+
+  Messages:
+    BindingsValueListEntry: A BindingsValueListEntry object.
+
+  Fields:
+    bindings: An association between a role, which comes with a set of
+      permissions, and members who may assume that role.
+    etag: HTTP 1.1  Entity tag for the policy.
+    kind: The kind of item this is. For policies, this is always
+      storage#policy. This field is ignored on input.
+    resourceId: The ID of the resource to which this policy belongs. Will be
+      of the form projects/_/buckets/bucket for buckets, and
+      projects/_/buckets/bucket/objects/object for objects. A specific
+      generation may be specified by appending #generationNumber to the end of
+      the object name, e.g. projects/_/buckets/my-bucket/objects/data.txt#17.
+      The current generation can be denoted with #0. This field is ignored on
+      input.
+  """
+  class BindingsValueListEntry(_messages.Message):
+    r"""A BindingsValueListEntry object.
+
+    Fields:
+      condition: A extra_types.JsonValue attribute.
+      members: A collection of identifiers for members who may assume the
+        provided role. Recognized identifiers are as follows:   - allUsers - A
+        special identifier that represents anyone on the internet; with or
+        without a Google account.   - allAuthenticatedUsers - A special
+        identifier that represents anyone who is authenticated with a Google
+        account or a service account.   - user:emailid - An email address that
+        represents a specific account. For example, user:alice@gmail.com or
+        user:joe@example.com.   - serviceAccount:emailid - An email address
+        that represents a service account. For example,  serviceAccount:my-
+        other-app@appspot.gserviceaccount.com .   - group:emailid - An email
+        address that represents a Google group. For example,
+        group:admins@example.com.   - domain:domain - A Google Apps domain
+        name that represents all the users of that domain. For example,
+        domain:google.com or domain:example.com.   - projectOwner:projectid -
+        Owners of the given project. For example, projectOwner:my-example-
+        project   - projectEditor:projectid - Editors of the given project.
+        For example, projectEditor:my-example-project   -
+        projectViewer:projectid - Viewers of the given project. For example,
+        projectViewer:my-example-project
+      role: The role to which members belong. Two types of roles are
+        supported: new IAM roles, which grant permissions that do not map
+        directly to those provided by ACLs, and legacy IAM roles, which do map
+        directly to ACL permissions. All roles are of the format
+        roles/storage.specificRole. The new IAM roles are:   -
+        roles/storage.admin - Full control of Google Cloud Storage resources.
+        - roles/storage.objectViewer - Read-Only access to Google Cloud
+        Storage objects.   - roles/storage.objectCreator - Access to create
+        objects in Google Cloud Storage.   - roles/storage.objectAdmin - Full
+        control of Google Cloud Storage objects.   The legacy IAM roles are:
+        - roles/storage.legacyObjectReader - Read-only access to objects
+        without listing. Equivalent to an ACL entry on an object with the
+        READER role.   - roles/storage.legacyObjectOwner - Read/write access
+        to existing objects without listing. Equivalent to an ACL entry on an
+        object with the OWNER role.   - roles/storage.legacyBucketReader -
+        Read access to buckets with object listing. Equivalent to an ACL entry
+        on a bucket with the READER role.   - roles/storage.legacyBucketWriter
+        - Read access to buckets with object listing/creation/deletion.
+        Equivalent to an ACL entry on a bucket with the WRITER role.   -
+        roles/storage.legacyBucketOwner - Read and write access to existing
+        buckets with object listing/creation/deletion. Equivalent to an ACL
+        entry on a bucket with the OWNER role.
+    """
+
+    condition = _messages.MessageField('extra_types.JsonValue', 1)
+    members = _messages.StringField(2, repeated=True)
+    role = _messages.StringField(3)
+
+  bindings = _messages.MessageField('BindingsValueListEntry', 1, repeated=True)
+  etag = _messages.BytesField(2)
+  kind = _messages.StringField(3, default=u'storage#policy')
+  resourceId = _messages.StringField(4)
+
+
+class RewriteResponse(_messages.Message):
+  r"""A rewrite response.
+
+  Fields:
+    done: true if the copy is finished; otherwise, false if the copy is in
+      progress. This property is always present in the response.
+    kind: The kind of item this is.
+    objectSize: The total size of the object being copied in bytes. This
+      property is always present in the response.
+    resource: A resource containing the metadata for the copied-to object.
+      This property is present in the response only when copying completes.
+    rewriteToken: A token to use in subsequent requests to continue copying
+      data. This token is present in the response only when there is more data
+      to copy.
+    totalBytesRewritten: The total bytes written so far, which can be used to
+      provide a waiting user with a progress indicator. This property is
+      always present in the response.
+  """
+
+  done = _messages.BooleanField(1)
+  kind = _messages.StringField(2, default=u'storage#rewriteResponse')
+  objectSize = _messages.IntegerField(3)
+  resource = _messages.MessageField('Object', 4)
+  rewriteToken = _messages.StringField(5)
+  totalBytesRewritten = _messages.IntegerField(6)
+
+
+class ServiceAccount(_messages.Message):
+  r"""A subscription to receive Google PubSub notifications.
+
+  Fields:
+    email_address: The ID of the notification.
+    kind: The kind of item this is. For notifications, this is always
+      storage#notification.
+  """
+
+  email_address = _messages.StringField(1)
+  kind = _messages.StringField(2, default=u'storage#serviceAccount')
+
+
+class StandardQueryParameters(_messages.Message):
+  r"""Query parameters accepted by all methods.
+
+  Enums:
+    AltValueValuesEnum: Data format for the response.
+
+  Fields:
+    alt: Data format for the response.
+    fields: Selector specifying which fields to include in a partial response.
+    key: API key. Your API key identifies your project and provides you with
+      API access, quota, and reports. Required unless you provide an OAuth 2.0
+      token.
+    oauth_token: OAuth 2.0 token for the current user.
+    prettyPrint: Returns response with indentations and line breaks.
+    quotaUser: An opaque string that represents a user for quota purposes.
+      Must not exceed 40 characters.
+    trace: A tracing token of the form "token:<tokenid>" to include in api
+      requests.
+    userIp: Deprecated. Please use quotaUser instead.
+  """
+  class AltValueValuesEnum(_messages.Enum):
+    r"""Data format for the response.
+
+    Values:
+      json: Responses with Content-Type of application/json
+    """
+    json = 0
+
+  alt = _messages.EnumField('AltValueValuesEnum', 1, default=u'json')
+  fields = _messages.StringField(2)
+  key = _messages.StringField(3)
+  oauth_token = _messages.StringField(4)
+  prettyPrint = _messages.BooleanField(5, default=True)
+  quotaUser = _messages.StringField(6)
+  trace = _messages.StringField(7)
+  userIp = _messages.StringField(8)
+
+
+class StorageBucketAccessControlsDeleteRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketAccessControlsDeleteResponse(_messages.Message):
+  r"""An empty StorageBucketAccessControlsDelete response."""
+
+
+class StorageBucketAccessControlsGetRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsGetRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketAccessControlsInsertRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsInsertRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketAccessControl: A BucketAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  bucketAccessControl = _messages.MessageField('BucketAccessControl', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketAccessControlsListRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsListRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class StorageBucketAccessControlsPatchRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsPatchRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketAccessControl: A BucketAccessControl resource to be passed as the
+      request body.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  bucketAccessControl = _messages.MessageField('BucketAccessControl', 2)
+  entity = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageBucketAccessControlsUpdateRequest(_messages.Message):
+  r"""A StorageBucketAccessControlsUpdateRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketAccessControl: A BucketAccessControl resource to be passed as the
+      request body.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  bucketAccessControl = _messages.MessageField('BucketAccessControl', 2)
+  entity = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageBucketsDeleteRequest(_messages.Message):
+  r"""A StorageBucketsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: If set, only deletes the bucket if its
+      metageneration matches this value.
+    ifMetagenerationNotMatch: If set, only deletes the bucket if its
+      metageneration does not match this value.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2)
+  ifMetagenerationNotMatch = _messages.IntegerField(3)
+  userProject = _messages.StringField(4)
+
+
+class StorageBucketsDeleteResponse(_messages.Message):
+  r"""An empty StorageBucketsDelete response."""
+
+
+class StorageBucketsGetIamPolicyRequest(_messages.Message):
+  r"""A StorageBucketsGetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class StorageBucketsGetRequest(_messages.Message):
+  r"""A StorageBucketsGetRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: Makes the return of the bucket metadata conditional
+      on whether the bucket's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the return of the bucket metadata
+      conditional on whether the bucket's current metageneration does not
+      match the given value.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2)
+  ifMetagenerationNotMatch = _messages.IntegerField(3)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 4)
+  userProject = _messages.StringField(5)
+
+
+class StorageBucketsInsertRequest(_messages.Message):
+  r"""A StorageBucketsInsertRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this bucket.
+    PredefinedDefaultObjectAclValueValuesEnum: Apply a predefined set of
+      default object access controls to this bucket.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the bucket resource specifies acl or defaultObjectAcl properties,
+      when it defaults to full.
+
+  Fields:
+    bucket: A Bucket resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this bucket.
+    predefinedDefaultObjectAcl: Apply a predefined set of default object
+      access controls to this bucket.
+    project: A valid API project identifier.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      bucket resource specifies acl or defaultObjectAcl properties, when it
+      defaults to full.
+    userProject: The project to be billed for this request.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this bucket.
+
+    Values:
+      authenticatedRead: Project team owners get OWNER access, and
+        allAuthenticatedUsers get READER access.
+      private: Project team owners get OWNER access.
+      projectPrivate: Project team members get access according to their
+        roles.
+      publicRead: Project team owners get OWNER access, and allUsers get
+        READER access.
+      publicReadWrite: Project team owners get OWNER access, and allUsers get
+        WRITER access.
+    """
+    authenticatedRead = 0
+    private = 1
+    projectPrivate = 2
+    publicRead = 3
+    publicReadWrite = 4
+
+  class PredefinedDefaultObjectAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of default object access controls to this
+    bucket.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the bucket
+    resource specifies acl or defaultObjectAcl properties, when it defaults to
+    full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.MessageField('Bucket', 1)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 2)
+  predefinedDefaultObjectAcl = _messages.EnumField(
+      'PredefinedDefaultObjectAclValueValuesEnum', 3)
+  project = _messages.StringField(4, required=True)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageBucketsListRequest(_messages.Message):
+  r"""A StorageBucketsListRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    maxResults: Maximum number of buckets to return in a single response. The
+      service will use this parameter or 1,000 items, whichever is smaller.
+    pageToken: A previously-returned page token representing part of the
+      larger set of results to view.
+    prefix: Filter results to buckets whose names begin with this prefix.
+    project: A valid API project identifier.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  maxResults = _messages.IntegerField(
+      1, variant=_messages.Variant.UINT32, default=1000)
+  pageToken = _messages.StringField(2)
+  prefix = _messages.StringField(3)
+  project = _messages.StringField(4, required=True)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageBucketsLockRetentionPolicyRequest(_messages.Message):
+  r"""A StorageBucketsLockRetentionPolicyRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: Makes the operation conditional on whether bucket's
+      current metageneration matches the given value.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketsPatchRequest(_messages.Message):
+  r"""A StorageBucketsPatchRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this bucket.
+    PredefinedDefaultObjectAclValueValuesEnum: Apply a predefined set of
+      default object access controls to this bucket.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketResource: A Bucket resource to be passed as the request body.
+    ifMetagenerationMatch: Makes the return of the bucket metadata conditional
+      on whether the bucket's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the return of the bucket metadata
+      conditional on whether the bucket's current metageneration does not
+      match the given value.
+    predefinedAcl: Apply a predefined set of access controls to this bucket.
+    predefinedDefaultObjectAcl: Apply a predefined set of default object
+      access controls to this bucket.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this bucket.
+
+    Values:
+      authenticatedRead: Project team owners get OWNER access, and
+        allAuthenticatedUsers get READER access.
+      private: Project team owners get OWNER access.
+      projectPrivate: Project team members get access according to their
+        roles.
+      publicRead: Project team owners get OWNER access, and allUsers get
+        READER access.
+      publicReadWrite: Project team owners get OWNER access, and allUsers get
+        WRITER access.
+    """
+    authenticatedRead = 0
+    private = 1
+    projectPrivate = 2
+    publicRead = 3
+    publicReadWrite = 4
+
+  class PredefinedDefaultObjectAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of default object access controls to this
+    bucket.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  bucketResource = _messages.MessageField('Bucket', 2)
+  ifMetagenerationMatch = _messages.IntegerField(3)
+  ifMetagenerationNotMatch = _messages.IntegerField(4)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 5)
+  predefinedDefaultObjectAcl = _messages.EnumField(
+      'PredefinedDefaultObjectAclValueValuesEnum', 6)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 7)
+  userProject = _messages.StringField(8)
+
+
+class StorageBucketsSetIamPolicyRequest(_messages.Message):
+  r"""A StorageBucketsSetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    policy: A Policy resource to be passed as the request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  policy = _messages.MessageField('Policy', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketsTestIamPermissionsRequest(_messages.Message):
+  r"""A StorageBucketsTestIamPermissionsRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    permissions: Permissions to test.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  permissions = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageBucketsUpdateRequest(_messages.Message):
+  r"""A StorageBucketsUpdateRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this bucket.
+    PredefinedDefaultObjectAclValueValuesEnum: Apply a predefined set of
+      default object access controls to this bucket.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of a bucket.
+    bucketResource: A Bucket resource to be passed as the request body.
+    ifMetagenerationMatch: Makes the return of the bucket metadata conditional
+      on whether the bucket's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the return of the bucket metadata
+      conditional on whether the bucket's current metageneration does not
+      match the given value.
+    predefinedAcl: Apply a predefined set of access controls to this bucket.
+    predefinedDefaultObjectAcl: Apply a predefined set of default object
+      access controls to this bucket.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this bucket.
+
+    Values:
+      authenticatedRead: Project team owners get OWNER access, and
+        allAuthenticatedUsers get READER access.
+      private: Project team owners get OWNER access.
+      projectPrivate: Project team members get access according to their
+        roles.
+      publicRead: Project team owners get OWNER access, and allUsers get
+        READER access.
+      publicReadWrite: Project team owners get OWNER access, and allUsers get
+        WRITER access.
+    """
+    authenticatedRead = 0
+    private = 1
+    projectPrivate = 2
+    publicRead = 3
+    publicReadWrite = 4
+
+  class PredefinedDefaultObjectAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of default object access controls to this
+    bucket.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit owner, acl and defaultObjectAcl properties.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  bucketResource = _messages.MessageField('Bucket', 2)
+  ifMetagenerationMatch = _messages.IntegerField(3)
+  ifMetagenerationNotMatch = _messages.IntegerField(4)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 5)
+  predefinedDefaultObjectAcl = _messages.EnumField(
+      'PredefinedDefaultObjectAclValueValuesEnum', 6)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 7)
+  userProject = _messages.StringField(8)
+
+
+class StorageChannelsStopResponse(_messages.Message):
+  r"""An empty StorageChannelsStop response."""
+
+
+class StorageDefaultObjectAccessControlsDeleteRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageDefaultObjectAccessControlsDeleteResponse(_messages.Message):
+  r"""An empty StorageDefaultObjectAccessControlsDelete response."""
+
+
+class StorageDefaultObjectAccessControlsGetRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsGetRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageDefaultObjectAccessControlsInsertRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsInsertRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageDefaultObjectAccessControlsListRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsListRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    ifMetagenerationMatch: If present, only return default ACL listing if the
+      bucket's current metageneration matches this value.
+    ifMetagenerationNotMatch: If present, only return default ACL listing if
+      the bucket's current metageneration does not match the given value.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  ifMetagenerationMatch = _messages.IntegerField(2)
+  ifMetagenerationNotMatch = _messages.IntegerField(3)
+  userProject = _messages.StringField(4)
+
+
+class StorageDefaultObjectAccessControlsPatchRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsPatchRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 3)
+  userProject = _messages.StringField(4)
+
+
+class StorageDefaultObjectAccessControlsUpdateRequest(_messages.Message):
+  r"""A StorageDefaultObjectAccessControlsUpdateRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 3)
+  userProject = _messages.StringField(4)
+
+
+class StorageNotificationsDeleteRequest(_messages.Message):
+  r"""A StorageNotificationsDeleteRequest object.
+
+  Fields:
+    bucket: The parent bucket of the notification.
+    notification: ID of the notification to delete.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  notification = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageNotificationsDeleteResponse(_messages.Message):
+  r"""An empty StorageNotificationsDelete response."""
+
+
+class StorageNotificationsGetRequest(_messages.Message):
+  r"""A StorageNotificationsGetRequest object.
+
+  Fields:
+    bucket: The parent bucket of the notification.
+    notification: Notification ID
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  notification = _messages.StringField(2, required=True)
+  userProject = _messages.StringField(3)
+
+
+class StorageNotificationsInsertRequest(_messages.Message):
+  r"""A StorageNotificationsInsertRequest object.
+
+  Fields:
+    bucket: The parent bucket of the notification.
+    notification: A Notification resource to be passed as the request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  notification = _messages.MessageField('Notification', 2)
+  userProject = _messages.StringField(3)
+
+
+class StorageNotificationsListRequest(_messages.Message):
+  r"""A StorageNotificationsListRequest object.
+
+  Fields:
+    bucket: Name of a Google Cloud Storage bucket.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class StorageObjectAccessControlsDeleteRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsDeleteRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectAccessControlsDeleteResponse(_messages.Message):
+  r"""An empty StorageObjectAccessControlsDelete response."""
+
+
+class StorageObjectAccessControlsGetRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsGetRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectAccessControlsInsertRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsInsertRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 4)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectAccessControlsListRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsListRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageObjectAccessControlsPatchRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsPatchRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageObjectAccessControlsUpdateRequest(_messages.Message):
+  r"""A StorageObjectAccessControlsUpdateRequest object.
+
+  Fields:
+    bucket: Name of a bucket.
+    entity: The entity holding the permission. Can be user-userId, user-
+      emailAddress, group-groupId, group-emailAddress, allUsers, or
+      allAuthenticatedUsers.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectAccessControl: A ObjectAccessControl resource to be passed as the
+      request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  entity = _messages.StringField(2, required=True)
+  generation = _messages.IntegerField(3)
+  object = _messages.StringField(4, required=True)
+  objectAccessControl = _messages.MessageField('ObjectAccessControl', 5)
+  userProject = _messages.StringField(6)
+
+
+class StorageObjectsComposeRequest(_messages.Message):
+  r"""A StorageObjectsComposeRequest object.
+
+  Enums:
+    DestinationPredefinedAclValueValuesEnum: Apply a predefined set of access
+      controls to the destination object.
+
+  Fields:
+    composeRequest: A ComposeRequest resource to be passed as the request
+      body.
+    destinationBucket: Name of the bucket containing the source objects. The
+      destination object is stored in this bucket.
+    destinationObject: Name of the new object. For information about how to
+      URL encode object names to be path safe, see Encoding URI Path Parts.
+    destinationPredefinedAcl: Apply a predefined set of access controls to the
+      destination object.
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    kmsKeyName: Resource name of the Cloud KMS key, of the form projects/my-
+      project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be
+      used to encrypt the object. Overrides the object metadata's kms_key_name
+      value, if any.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class DestinationPredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to the destination object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  composeRequest = _messages.MessageField('ComposeRequest', 1)
+  destinationBucket = _messages.StringField(2, required=True)
+  destinationObject = _messages.StringField(3, required=True)
+  destinationPredefinedAcl = _messages.EnumField(
+      'DestinationPredefinedAclValueValuesEnum', 4)
+  ifGenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationMatch = _messages.IntegerField(6)
+  kmsKeyName = _messages.StringField(7)
+  userProject = _messages.StringField(8)
+
+
+class StorageObjectsCopyRequest(_messages.Message):
+  r"""A StorageObjectsCopyRequest object.
+
+  Enums:
+    DestinationPredefinedAclValueValuesEnum: Apply a predefined set of access
+      controls to the destination object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the object resource specifies the acl property, when it defaults
+      to full.
+
+  Fields:
+    destinationBucket: Name of the bucket in which to store the new object.
+      Overrides the provided object metadata's bucket value, if any.For
+      information about how to URL encode object names to be path safe, see
+      Encoding URI Path Parts.
+    destinationObject: Name of the new object. Required when the object
+      metadata is not otherwise provided. Overrides the object metadata's name
+      value, if any.
+    destinationPredefinedAcl: Apply a predefined set of access controls to the
+      destination object.
+    ifGenerationMatch: Makes the operation conditional on whether the
+      destination object's current generation matches the given value. Setting
+      to 0 makes the operation succeed only if there are no live versions of
+      the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      destination object's current generation does not match the given value.
+      If no live object exists, the precondition fails. Setting to 0 makes the
+      operation succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      destination object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      destination object's current metageneration does not match the given
+      value.
+    ifSourceGenerationMatch: Makes the operation conditional on whether the
+      source object's current generation matches the given value.
+    ifSourceGenerationNotMatch: Makes the operation conditional on whether the
+      source object's current generation does not match the given value.
+    ifSourceMetagenerationMatch: Makes the operation conditional on whether
+      the source object's current metageneration matches the given value.
+    ifSourceMetagenerationNotMatch: Makes the operation conditional on whether
+      the source object's current metageneration does not match the given
+      value.
+    object: A Object resource to be passed as the request body.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      object resource specifies the acl property, when it defaults to full.
+    sourceBucket: Name of the bucket in which to find the source object.
+    sourceGeneration: If present, selects a specific revision of the source
+      object (as opposed to the latest version, the default).
+    sourceObject: Name of the source object. For information about how to URL
+      encode object names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class DestinationPredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to the destination object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the object
+    resource specifies the acl property, when it defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  destinationBucket = _messages.StringField(1, required=True)
+  destinationObject = _messages.StringField(2, required=True)
+  destinationPredefinedAcl = _messages.EnumField(
+      'DestinationPredefinedAclValueValuesEnum', 3)
+  ifGenerationMatch = _messages.IntegerField(4)
+  ifGenerationNotMatch = _messages.IntegerField(5)
+  ifMetagenerationMatch = _messages.IntegerField(6)
+  ifMetagenerationNotMatch = _messages.IntegerField(7)
+  ifSourceGenerationMatch = _messages.IntegerField(8)
+  ifSourceGenerationNotMatch = _messages.IntegerField(9)
+  ifSourceMetagenerationMatch = _messages.IntegerField(10)
+  ifSourceMetagenerationNotMatch = _messages.IntegerField(11)
+  object = _messages.MessageField('Object', 12)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 13)
+  sourceBucket = _messages.StringField(14, required=True)
+  sourceGeneration = _messages.IntegerField(15)
+  sourceObject = _messages.StringField(16, required=True)
+  userProject = _messages.StringField(17)
+
+
+class StorageObjectsDeleteRequest(_messages.Message):
+  r"""A StorageObjectsDeleteRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, permanently deletes a specific revision of this
+      object (as opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  userProject = _messages.StringField(8)
+
+
+class StorageObjectsDeleteResponse(_messages.Message):
+  r"""An empty StorageObjectsDelete response."""
+
+
+class StorageObjectsGetIamPolicyRequest(_messages.Message):
+  r"""A StorageObjectsGetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  userProject = _messages.StringField(4)
+
+
+class StorageObjectsGetRequest(_messages.Message):
+  r"""A StorageObjectsGetRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 8)
+  userProject = _messages.StringField(9)
+
+
+class StorageObjectsInsertRequest(_messages.Message):
+  r"""A StorageObjectsInsertRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the object resource specifies the acl property, when it defaults
+      to full.
+
+  Fields:
+    bucket: Name of the bucket in which to store the new object. Overrides the
+      provided object metadata's bucket value, if any.
+    contentEncoding: If set, sets the contentEncoding property of the final
+      object to this value. Setting this parameter is equivalent to setting
+      the contentEncoding metadata property. This can be useful when uploading
+      an object with uploadType=media to indicate the encoding of the content
+      being uploaded.
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    kmsKeyName: Resource name of the Cloud KMS key, of the form projects/my-
+      project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be
+      used to encrypt the object. Overrides the object metadata's kms_key_name
+      value, if any.
+    name: Name of the object. Required when the object metadata is not
+      otherwise provided. Overrides the object metadata's name value, if any.
+      For information about how to URL encode object names to be path safe,
+      see Encoding URI Path Parts.
+    object: A Object resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this object.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      object resource specifies the acl property, when it defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the object
+    resource specifies the acl property, when it defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  contentEncoding = _messages.StringField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  kmsKeyName = _messages.StringField(7)
+  name = _messages.StringField(8)
+  object = _messages.MessageField('Object', 9)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 10)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 11)
+  userProject = _messages.StringField(12)
+
+
+class StorageObjectsListRequest(_messages.Message):
+  r"""A StorageObjectsListRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of the bucket in which to look for objects.
+    delimiter: Returns results in a directory-like mode. items will contain
+      only objects whose names, aside from the prefix, do not contain
+      delimiter. Objects whose names, aside from the prefix, contain delimiter
+      will have their name, truncated after the delimiter, returned in
+      prefixes. Duplicate prefixes are omitted.
+    includeTrailingDelimiter: If true, objects that end in exactly one
+      instance of delimiter will have their metadata included in items in
+      addition to prefixes.
+    maxResults: Maximum number of items plus prefixes to return in a single
+      page of responses. As duplicate prefixes are omitted, fewer total
+      results may be returned than requested. The service will use this
+      parameter or 1,000 items, whichever is smaller.
+    pageToken: A previously-returned page token representing part of the
+      larger set of results to view.
+    prefix: Filter results to objects whose names begin with this prefix.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+    versions: If true, lists all versions of an object as distinct results.
+      The default is false. For more information, see Object Versioning.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  delimiter = _messages.StringField(2)
+  includeTrailingDelimiter = _messages.BooleanField(3)
+  maxResults = _messages.IntegerField(
+      4, variant=_messages.Variant.UINT32, default=1000)
+  pageToken = _messages.StringField(5)
+  prefix = _messages.StringField(6)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 7)
+  userProject = _messages.StringField(8)
+  versions = _messages.BooleanField(9)
+
+
+class StorageObjectsPatchRequest(_messages.Message):
+  r"""A StorageObjectsPatchRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectResource: A Object resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this object.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request, for Requester Pays
+      buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  objectResource = _messages.MessageField('Object', 8)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 9)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 10)
+  userProject = _messages.StringField(11)
+
+
+class StorageObjectsRewriteRequest(_messages.Message):
+  r"""A StorageObjectsRewriteRequest object.
+
+  Enums:
+    DestinationPredefinedAclValueValuesEnum: Apply a predefined set of access
+      controls to the destination object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl,
+      unless the object resource specifies the acl property, when it defaults
+      to full.
+
+  Fields:
+    destinationBucket: Name of the bucket in which to store the new object.
+      Overrides the provided object metadata's bucket value, if any.
+    destinationKmsKeyName: Resource name of the Cloud KMS key, of the form
+      projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key,
+      that will be used to encrypt the object. Overrides the object metadata's
+      kms_key_name value, if any.
+    destinationObject: Name of the new object. Required when the object
+      metadata is not otherwise provided. Overrides the object metadata's name
+      value, if any. For information about how to URL encode object names to
+      be path safe, see Encoding URI Path Parts.
+    destinationPredefinedAcl: Apply a predefined set of access controls to the
+      destination object.
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      destination object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      destination object's current metageneration does not match the given
+      value.
+    ifSourceGenerationMatch: Makes the operation conditional on whether the
+      source object's current generation matches the given value.
+    ifSourceGenerationNotMatch: Makes the operation conditional on whether the
+      source object's current generation does not match the given value.
+    ifSourceMetagenerationMatch: Makes the operation conditional on whether
+      the source object's current metageneration matches the given value.
+    ifSourceMetagenerationNotMatch: Makes the operation conditional on whether
+      the source object's current metageneration does not match the given
+      value.
+    maxBytesRewrittenPerCall: The maximum number of bytes that will be
+      rewritten per rewrite request. Most callers shouldn't need to specify
+      this parameter - it is primarily in place to support testing. If
+      specified the value must be an integral multiple of 1 MiB (1048576).
+      Also, this only applies to requests where the source and destination
+      span locations and/or storage classes. Finally, this value must not
+      change across rewrite calls else you'll get an error that the
+      rewriteToken is invalid.
+    object: A Object resource to be passed as the request body.
+    projection: Set of properties to return. Defaults to noAcl, unless the
+      object resource specifies the acl property, when it defaults to full.
+    rewriteToken: Include this field (from the previous rewrite response) on
+      each rewrite request after the first one, until the rewrite response
+      'done' flag is true. Calls that provide a rewriteToken can omit all
+      other request fields, but if included those fields must match the values
+      provided in the first rewrite request.
+    sourceBucket: Name of the bucket in which to find the source object.
+    sourceGeneration: If present, selects a specific revision of the source
+      object (as opposed to the latest version, the default).
+    sourceObject: Name of the source object. For information about how to URL
+      encode object names to be path safe, see Encoding URI Path Parts.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class DestinationPredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to the destination object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl, unless the object
+    resource specifies the acl property, when it defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  destinationBucket = _messages.StringField(1, required=True)
+  destinationKmsKeyName = _messages.StringField(2)
+  destinationObject = _messages.StringField(3, required=True)
+  destinationPredefinedAcl = _messages.EnumField(
+      'DestinationPredefinedAclValueValuesEnum', 4)
+  ifGenerationMatch = _messages.IntegerField(5)
+  ifGenerationNotMatch = _messages.IntegerField(6)
+  ifMetagenerationMatch = _messages.IntegerField(7)
+  ifMetagenerationNotMatch = _messages.IntegerField(8)
+  ifSourceGenerationMatch = _messages.IntegerField(9)
+  ifSourceGenerationNotMatch = _messages.IntegerField(10)
+  ifSourceMetagenerationMatch = _messages.IntegerField(11)
+  ifSourceMetagenerationNotMatch = _messages.IntegerField(12)
+  maxBytesRewrittenPerCall = _messages.IntegerField(13)
+  object = _messages.MessageField('Object', 14)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 15)
+  rewriteToken = _messages.StringField(16)
+  sourceBucket = _messages.StringField(17, required=True)
+  sourceGeneration = _messages.IntegerField(18)
+  sourceObject = _messages.StringField(19, required=True)
+  userProject = _messages.StringField(20)
+
+
+class StorageObjectsSetIamPolicyRequest(_messages.Message):
+  r"""A StorageObjectsSetIamPolicyRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    policy: A Policy resource to be passed as the request body.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  policy = _messages.MessageField('Policy', 4)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectsTestIamPermissionsRequest(_messages.Message):
+  r"""A StorageObjectsTestIamPermissionsRequest object.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    permissions: Permissions to test.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  object = _messages.StringField(3, required=True)
+  permissions = _messages.StringField(4, required=True)
+  userProject = _messages.StringField(5)
+
+
+class StorageObjectsUpdateRequest(_messages.Message):
+  r"""A StorageObjectsUpdateRequest object.
+
+  Enums:
+    PredefinedAclValueValuesEnum: Apply a predefined set of access controls to
+      this object.
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to full.
+
+  Fields:
+    bucket: Name of the bucket in which the object resides.
+    generation: If present, selects a specific revision of this object (as
+      opposed to the latest version, the default).
+    ifGenerationMatch: Makes the operation conditional on whether the object's
+      current generation matches the given value. Setting to 0 makes the
+      operation succeed only if there are no live versions of the object.
+    ifGenerationNotMatch: Makes the operation conditional on whether the
+      object's current generation does not match the given value. If no live
+      object exists, the precondition fails. Setting to 0 makes the operation
+      succeed only if there is a live version of the object.
+    ifMetagenerationMatch: Makes the operation conditional on whether the
+      object's current metageneration matches the given value.
+    ifMetagenerationNotMatch: Makes the operation conditional on whether the
+      object's current metageneration does not match the given value.
+    object: Name of the object. For information about how to URL encode object
+      names to be path safe, see Encoding URI Path Parts.
+    objectResource: A Object resource to be passed as the request body.
+    predefinedAcl: Apply a predefined set of access controls to this object.
+    projection: Set of properties to return. Defaults to full.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+  """
+  class PredefinedAclValueValuesEnum(_messages.Enum):
+    r"""Apply a predefined set of access controls to this object.
+
+    Values:
+      authenticatedRead: Object owner gets OWNER access, and
+        allAuthenticatedUsers get READER access.
+      bucketOwnerFullControl: Object owner gets OWNER access, and project team
+        owners get OWNER access.
+      bucketOwnerRead: Object owner gets OWNER access, and project team owners
+        get READER access.
+      private: Object owner gets OWNER access.
+      projectPrivate: Object owner gets OWNER access, and project team members
+        get access according to their roles.
+      publicRead: Object owner gets OWNER access, and allUsers get READER
+        access.
+    """
+    authenticatedRead = 0
+    bucketOwnerFullControl = 1
+    bucketOwnerRead = 2
+    private = 3
+    projectPrivate = 4
+    publicRead = 5
+
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to full.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  generation = _messages.IntegerField(2)
+  ifGenerationMatch = _messages.IntegerField(3)
+  ifGenerationNotMatch = _messages.IntegerField(4)
+  ifMetagenerationMatch = _messages.IntegerField(5)
+  ifMetagenerationNotMatch = _messages.IntegerField(6)
+  object = _messages.StringField(7, required=True)
+  objectResource = _messages.MessageField('Object', 8)
+  predefinedAcl = _messages.EnumField('PredefinedAclValueValuesEnum', 9)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 10)
+  userProject = _messages.StringField(11)
+
+
+class StorageObjectsWatchAllRequest(_messages.Message):
+  r"""A StorageObjectsWatchAllRequest object.
+
+  Enums:
+    ProjectionValueValuesEnum: Set of properties to return. Defaults to noAcl.
+
+  Fields:
+    bucket: Name of the bucket in which to look for objects.
+    channel: A Channel resource to be passed as the request body.
+    delimiter: Returns results in a directory-like mode. items will contain
+      only objects whose names, aside from the prefix, do not contain
+      delimiter. Objects whose names, aside from the prefix, contain delimiter
+      will have their name, truncated after the delimiter, returned in
+      prefixes. Duplicate prefixes are omitted.
+    includeTrailingDelimiter: If true, objects that end in exactly one
+      instance of delimiter will have their metadata included in items in
+      addition to prefixes.
+    maxResults: Maximum number of items plus prefixes to return in a single
+      page of responses. As duplicate prefixes are omitted, fewer total
+      results may be returned than requested. The service will use this
+      parameter or 1,000 items, whichever is smaller.
+    pageToken: A previously-returned page token representing part of the
+      larger set of results to view.
+    prefix: Filter results to objects whose names begin with this prefix.
+    projection: Set of properties to return. Defaults to noAcl.
+    userProject: The project to be billed for this request. Required for
+      Requester Pays buckets.
+    versions: If true, lists all versions of an object as distinct results.
+      The default is false. For more information, see Object Versioning.
+  """
+  class ProjectionValueValuesEnum(_messages.Enum):
+    r"""Set of properties to return. Defaults to noAcl.
+
+    Values:
+      full: Include all properties.
+      noAcl: Omit the owner, acl property.
+    """
+    full = 0
+    noAcl = 1
+
+  bucket = _messages.StringField(1, required=True)
+  channel = _messages.MessageField('Channel', 2)
+  delimiter = _messages.StringField(3)
+  includeTrailingDelimiter = _messages.BooleanField(4)
+  maxResults = _messages.IntegerField(
+      5, variant=_messages.Variant.UINT32, default=1000)
+  pageToken = _messages.StringField(6)
+  prefix = _messages.StringField(7)
+  projection = _messages.EnumField('ProjectionValueValuesEnum', 8)
+  userProject = _messages.StringField(9)
+  versions = _messages.BooleanField(10)
+
+
+class StorageProjectsServiceAccountGetRequest(_messages.Message):
+  r"""A StorageProjectsServiceAccountGetRequest object.
+
+  Fields:
+    projectId: Project ID
+    userProject: The project to be billed for this request.
+  """
+
+  projectId = _messages.StringField(1, required=True)
+  userProject = _messages.StringField(2)
+
+
+class TestIamPermissionsResponse(_messages.Message):
+  r"""A storage.(buckets|objects).testIamPermissions response.
+
+  Fields:
+    kind: The kind of item this is.
+    permissions: The permissions held by the caller. Permissions are always of
+      the format storage.resource.capability, where resource is one of buckets
+      or objects. The supported permissions are as follows:   -
+      storage.buckets.delete - Delete bucket.   - storage.buckets.get - Read
+      bucket metadata.   - storage.buckets.getIamPolicy - Read bucket IAM
+      policy.   - storage.buckets.create - Create bucket.   -
+      storage.buckets.list - List buckets.   - storage.buckets.setIamPolicy -
+      Update bucket IAM policy.   - storage.buckets.update - Update bucket
+      metadata.   - storage.objects.delete - Delete object.   -
+      storage.objects.get - Read object data and metadata.   -
+      storage.objects.getIamPolicy - Read object IAM policy.   -
+      storage.objects.create - Create object.   - storage.objects.list - List
+      objects.   - storage.objects.setIamPolicy - Update object IAM policy.
+      - storage.objects.update - Update object metadata.
+  """
+
+  kind = _messages.StringField(1, default=u'storage#testIamPermissionsResponse')
+  permissions = _messages.StringField(2, repeated=True)
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index d9dea3efdc4..ce37688ef82 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -95,7 +95,6 @@ class SetupTest(unittest.TestCase):
             errors, ['project', 'staging_location', 'temp_location', 'region']),
         [])
 
-  @unittest.skip('Not compatible with new GCS client. See GH issue #26335.')
   def test_gcs_path(self):
     def get_validator(temp_location, staging_location):
       options = ['--project=example:example', '--job_name=job']
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 0d08a7d5f1d..bffcc6d6634 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -55,6 +55,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
@@ -554,7 +555,6 @@ class DataflowApplicationClient(object):
         root_staging_location or self.google_cloud_options.staging_location)
 
     from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
-    from google.cloud import storage
     if _is_runner_v2(options):
       self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
     else:
@@ -562,10 +562,8 @@ class DataflowApplicationClient(object):
 
     if self.google_cloud_options.no_auth:
       credentials = None
-      storage_credentials = None
     else:
       credentials = get_service_credentials(options)
-      storage_credentials = credentials.get_google_auth_credentials()
 
     http_client = get_new_http()
     self._client = dataflow.DataflowV1b3(
@@ -574,10 +572,12 @@ class DataflowApplicationClient(object):
         get_credentials=(not self.google_cloud_options.no_auth),
         http=http_client,
         response_encoding=get_response_encoding())
-    if credentials:
-      self._storage_client = storage.Client(credentials=storage_credentials)
-    else:
-      self._storage_client = storage.Client.create_anonymous_client()
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self.google_cloud_options.no_auth),
+        http=http_client,
+        response_encoding=get_response_encoding())
     self._sdk_image_overrides = self._get_sdk_image_overrides(options)
 
   def _get_sdk_image_overrides(self, pipeline_options):
@@ -720,8 +720,6 @@ class DataflowApplicationClient(object):
       mime_type='application/octet-stream',
       total_size=None):
     """Stages a file at a GCS or local path with stream-supplied contents."""
-    from google.cloud.exceptions import Forbidden
-    from google.cloud.exceptions import NotFound
     if not gcs_or_local_path.startswith('gs://'):
       local_path = FileSystems.join(gcs_or_local_path, file_name)
       _LOGGER.info('Staging file locally to %s', local_path)
@@ -729,29 +727,31 @@ class DataflowApplicationClient(object):
         f.write(stream.read())
       return
     gcs_location = FileSystems.join(gcs_or_local_path, file_name)
+    bucket, name = gcs_location[5:].split('/', 1)
+
+    request = storage.StorageObjectsInsertRequest(bucket=bucket, name=name)
     start_time = time.time()
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
+    upload = storage.Upload(stream, mime_type, total_size)
     try:
-      with FileSystems.create(gcs_location) as f:
-        f.write(stream.read())
-        return
-    except Exception as e:
-      reportable_errors = [
-          Forbidden,
-          NotFound,
-      ]
-      if type(e) in reportable_errors:
+      response = self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
         raise IOError((
             'Could not upload to GCS path %s: %s. Please verify '
-            'that credentials are valid, that the specified path '
-            'exists, and that you have write access to it.') %
-                      (gcs_or_local_path, e))
+            'that credentials are valid and that you have write '
+            'access to the specified path.') %
+                      (gcs_or_local_path, reportable_errors[e.status_code]))
       raise
-    finally:
-      _LOGGER.info(
-          'Completed GCS upload to %s in %s seconds.',
-          gcs_location,
-          int(time.time() - start_time))
+    _LOGGER.info(
+        'Completed GCS upload to %s in %s seconds.',
+        gcs_location,
+        int(time.time() - start_time))
+    return response
 
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def create_job(self, job):
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index b5fcee48c29..bdd9ab4d1a8 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -33,6 +33,8 @@ import apache_beam as beam
 from apache_beam.dataframe.convert import to_pcollection
 from apache_beam.dataframe.frame_base import DeferredBase
 from apache_beam.internal.gcp import auth
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import Pipeline
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -449,23 +451,21 @@ def assert_bucket_exists(bucket_name):
     Logs a warning if the bucket cannot be verified to exist.
   """
   try:
-    from google.cloud.exceptions import ClientError
-    from google.cloud.exceptions import NotFound
-    from google.cloud import storage
-    credentials = auth.get_service_credentials(PipelineOptions())
-    if credentials:
-      storage_client = storage.Client(credentials=credentials)
-    else:
-      storage_client = storage.Client.create_anonymous_client()
-    storage_client.get_bucket(bucket_name)
-  except ClientError as e:
-    if isinstance(e, NotFound):
+    from apitools.base.py.exceptions import HttpError
+    storage_client = storage.StorageV1(
+        credentials=auth.get_service_credentials(PipelineOptions()),
+        get_credentials=False,
+        http=get_new_http(),
+        response_encoding='utf8')
+    request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+    storage_client.buckets.Get(request)
+  except HttpError as e:
+    if e.status_code == 404:
       _LOGGER.error('%s bucket does not exist!', bucket_name)
       raise ValueError('Invalid GCS bucket provided!')
     else:
       _LOGGER.warning(
-          'ClientError - unable to verify whether bucket %s exists',
-          bucket_name)
+          'HttpError - unable to verify whether bucket %s exists', bucket_name)
   except ImportError:
     _LOGGER.warning(
         'ImportError - unable to verify whether bucket %s exists', bucket_name)
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index f3d7f96b0db..ecb71a2bdef 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -46,22 +46,27 @@ from apache_beam.utils.windowed_value import WindowedValue
 
 # Protect against environments where apitools library is not available.
 try:
-  from google.cloud.exceptions import BadRequest, NotFound
+  from apitools.base.py.exceptions import HttpError
+  from apitools.base.py.exceptions import HttpNotFoundError
 except ImportError:
   _http_error_imported = False
+  HttpError = ValueError
+  HttpNotFoundError = ValueError
 else:
   _http_error_imported = True
 
 
-class MockStorageClient():
-  def __init__(self):
-    pass
-
-  def get_bucket(self, path):
+class MockBuckets():
+  def Get(self, path):
     if path == 'test-bucket-not-found':
-      raise NotFound('Bucket not found')
+      raise HttpNotFoundError({'status': 404}, {}, '')
     elif path == 'test-bucket-not-verified':
-      raise BadRequest('Request faulty')
+      raise HttpError({'status': 400}, {}, '')
+
+
+class MockStorageClient():
+  def __init__(self, buckets=MockBuckets()):
+    self.buckets = buckets
 
 
 class Record(NamedTuple):
@@ -348,21 +353,29 @@ class GeneralUtilTest(unittest.TestCase):
     self.assertIs(getattr(main_session, name, None), value)
 
 
-@patch('google.cloud.storage.Client', return_value=MockStorageClient())
+@patch(
+    'apache_beam.io.gcp.internal.clients.storage.StorageV1',
+    return_value=MockStorageClient())
 @unittest.skipIf(not _http_error_imported, 'http errors are not imported.')
 class GCSUtilsTest(unittest.TestCase):
-  @patch('google.cloud.storage.Client.get_bucket')
+  @patch(
+      'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',
+      return_value='test-bucket-not-found')
   def test_assert_bucket_exists_not_found(self, mock_response, mock_client):
     with self.assertRaises(ValueError):
-      utils.assert_bucket_exists('test-bucket-not-found')
+      utils.assert_bucket_exists('')
 
-  @patch('google.cloud.storage.Client.get_bucket')
+  @patch(
+      'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',
+      return_value='test-bucket-not-verified')
   def test_assert_bucket_exists_not_verified(self, mock_response, mock_client):
     from apache_beam.runners.interactive.utils import _LOGGER
     with self.assertLogs(_LOGGER, level='WARNING'):
-      utils.assert_bucket_exists('test-bucket-not-verified')
+      utils.assert_bucket_exists('')
 
-  @patch('google.cloud.storage.Client.get_bucket')
+  @patch(
+      'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',
+      return_value='test-bucket-found')
   def test_assert_bucket_exists_found(self, mock_response, mock_client):
     utils.assert_bucket_exists('')
 
diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
index 0e8e0d97b46..19becd3e123 100644
--- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -41,6 +41,7 @@ from google.protobuf.json_format import MessageToJson
 from apache_beam import version as beam_version
 from apache_beam.internal.gcp.auth import get_service_credentials
 from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions  # pylint: disable=unused-import
 from apache_beam.options.pipeline_options import SetupOptions
@@ -209,12 +210,12 @@ class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
       credentials = None
     else:
       credentials = get_service_credentials(options)
-    from google.cloud import storage
-    if credentials:
-      self._storage_client = storage.Client(
-          credentials=credentials.get_google_auth_credentials())
-    else:
-      self._storage_client = storage.Client.create_anonymous_client()
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self._google_cloud_options.no_auth),
+        http=get_new_http(),
+        response_encoding='utf8')
     self._cloudbuild_client = cloudbuild.CloudbuildV1(
         credentials=credentials,
         get_credentials=(not self._google_cloud_options.no_auth),
@@ -306,23 +307,27 @@ class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
+    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+    request = storage.StorageObjectsInsertRequest(
+        bucket=gcs_bucket, name=gcs_object)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    from google.cloud import storage
-    from google.cloud.exceptions import Forbidden
-    from google.cloud.exceptions import NotFound
+    total_size = os.path.getsize(local_file_path)
+    from apitools.base.py import exceptions
     try:
-      bucket = self._storage_client.get_bucket(bucket_name)
-      blob = bucket.get_blob(blob_name)
-      if not blob:
-        blob = storage.Blob(name=blob_name, bucket=bucket)
-      blob.upload_from_filename(local_file_path)
-    except Exception as e:
-      if isinstance(e, (Forbidden, NotFound)):
+      with open(local_file_path, 'rb') as stream:
+        upload = storage.Upload(stream, 'application/octet-stream', total_size)
+        self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
         raise IOError((
             'Could not upload to GCS path %s: %s. Please verify '
             'that credentials are valid and that you have write '
-            'access to the specified path.') % (gcs_location, e.message))
+            'access to the specified path.') %
+                      (gcs_location, reportable_errors[e.status_code]))
       raise
     _LOGGER.info('Completed GCS upload to %s.', gcs_location)
 
diff --git a/sdks/python/mypy.ini b/sdks/python/mypy.ini
index 298f249ffbf..46dea481f93 100644
--- a/sdks/python/mypy.ini
+++ b/sdks/python/mypy.ini
@@ -39,6 +39,9 @@ ignore_errors = true
 # error: Cannot infer type of lambda  [misc]
 ignore_errors = true
 
+[mypy-apache_beam.io.gcp.internal.clients.storage.storage_v1_client]
+ignore_errors = true
+
 [mypy-apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_client]
 ignore_errors = true
 
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 3601df72bc1..c36530e13b2 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -309,7 +309,6 @@ if __name__ == '__main__':
             'google-cloud-datastore>=2.0.0,<3',
             'google-cloud-pubsub>=2.1.0,<3',
             'google-cloud-pubsublite>=1.2.0,<2',
-            'google-cloud-storage>=2.10.0,<2.11.0',
             # GCP packages required by tests
             'google-cloud-bigquery>=2.0.0,<4',
             'google-cloud-bigquery-storage>=2.6.3,<3',