You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "johnjcasey (via GitHub)" <gi...@apache.org> on 2023/04/05 19:08:10 UTC

[GitHub] [beam] johnjcasey commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

johnjcasey commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1158892009


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +144,13 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None

Review Comment:
   can we keep typehints? or are they very different?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -303,149 +256,79 @@ def delete_batch(self, paths):
       paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
       if not paths_chunk:
         return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+      with self.client.batch():
+        for path in paths_chunk:
+          bucket, blob_path = parse_gcs_path(path)
+          blob = storage.Blob(blob_path, bucket)
+          exception = None
+          try:
+            blob.delete()
+          except Exception as err:
+            if err is NotFound:
+              exception = None
+            else:
+              exception = err
+          finally:
+            result_statuses.append((path, exception))
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
+      !!!

Review Comment:
   typo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org