You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Abacn (via GitHub)" <gi...@apache.org> on 2023/04/25 17:42:52 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1176791404


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -241,24 +187,22 @@ def open(
     Raises:
       ValueError: Invalid open file mode.
     """
+    bucket_name, blob_name = parse_gcs_path(filename)
+    bucket = self.client.get_bucket(bucket_name)
+
     if mode == 'r' or mode == 'rb':
-      downloader = GcsDownloader(
-          self.client,
-          filename,
-          buffer_size=read_buffer_size,
-          get_project_number=self.get_project_number)
-      return io.BufferedReader(
-          DownloaderStream(
-              downloader, read_buffer_size=read_buffer_size, mode=mode),
-          buffer_size=read_buffer_size)
+      blob = bucket.get_blob(blob_name)
+      return storage.fileio.BlobReader(blob, chunk_size=read_buffer_size)

Review Comment:
   from google.cloud.storage.fileio import BlobReader?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   This is in a batch environment where persumably request is executed in with's `__exit__`. Have you verified that if there is any NotFound exception, it emits here in the with block or in `__exit__`?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   We may still need `MAX_BATCH_OPERATION_SIZE` logic as the documentation states "No more than 100 calls should be included in a single batch request.": https://cloud.google.com/storage/docs/batch#batch-example-request



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()
+        except Exception as err:
+          if err is NotFound:
             exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+          else:
+            exception = err
+        finally:
+          result_statuses.append((path, exception))
+
+      return result_statuses
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,

Review Comment:
   Just confirm if it is case that kms_key support is now embedded in the gcsio's constructor and no longer needed here?



##########
sdks/python/apache_beam/io/gcp/gcsio_integration_test.py:
##########
@@ -168,33 +139,6 @@ def _test_copy_batch(
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @pytest.mark.it_postcommit

Review Comment:
   Can we keep these tests? They are running on real kms key (different from unit test)



##########
sdks/python/apache_beam/io/filesystem.py:
##########
@@ -219,28 +219,33 @@ def _initialize_compressor(self):
 
   def readable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'r' in mode or 'a' in mode
+    try:
+      return 'r' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:

Review Comment:
   why there could be an attribute error here. Is that `self._file` does not have mode attribute? If so one can wrap self._file to a class that has these compatible attribute.



##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   I would understand changes to here and many test are not meant to be final?
   
   If there is an exception thrown in pipeline expansion time then the pipeline is incomplete.



##########
sdks/python/apache_beam/io/filesystem.py:
##########
@@ -219,28 +219,33 @@ def _initialize_compressor(self):
 
   def readable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'r' in mode or 'a' in mode
+    try:
+      return 'r' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:
+      return self._file.readable is True
 
   def writeable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'w' in mode or 'a' in mode
+    try:
+      return 'w' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:
+      return self._file.writable is True
 
   def write(self, data):
     # type: (bytes) -> None
 
     """Write data to file."""
     if not self._compressor:
-      raise ValueError('compressor not initialized')
+      self._initialize_compressor()

Review Comment:
   `self._initialize_compressor` is done in constructor if the file is writable. If hit this code path there is something wrong with the config of the writeable() (see above).
   
   In general, it should not be necessary to change the logics of filesystem.py here



##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -119,7 +122,7 @@ class _Credentials(object):
 
   @classmethod
   def get_service_credentials(cls, pipeline_options):
-    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
+    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]

Review Comment:
   What caused the return type change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org